]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
0a6613767d33a846cd18defc5746c72ffa4b890c
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2021 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/CER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER/CER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 Also it could be helpful to add quick ASN.1 pprinting command in your
359 pdb's configuration file::
360
361     alias pp1 import pyderasn ;; print(pyderasn.pprint(%1, oid_maps=(locals().get("OID_STR_TO_NAME", {}),)))
362
363 .. _definedby:
364
365 DEFINED BY
366 ----------
367
368 ASN.1 structures often have ANY and OCTET STRING fields, that are
369 DEFINED BY some previously met ObjectIdentifier. This library provides
370 ability to specify mapping between some OID and field that must be
371 decoded with specific specification.
372
373 .. _defines:
374
375 defines kwarg
376 _____________
377
378 :py:class:`pyderasn.ObjectIdentifier` field inside
379 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
380 necessary for decoding structures. For example, CMS (:rfc:`5652`)
381 container::
382
383     class ContentInfo(Sequence):
384         schema = (
385             ("contentType", ContentType(defines=((("content",), {
386                 id_digestedData: DigestedData(),
387                 id_signedData: SignedData(),
388             }),))),
389             ("content", Any(expl=tag_ctxc(0))),
390         )
391
392 ``contentType`` field tells that it defines that ``content`` must be
393 decoded with ``SignedData`` specification, if ``contentType`` equals to
394 ``id-signedData``. The same applies to ``DigestedData``. If
395 ``contentType`` contains unknown OID, then no automatic decoding is
396 done.
397
398 You can specify multiple fields, that will be autodecoded -- that is why
399 ``defines`` kwarg is a sequence. You can specify defined field
400 relatively or absolutely to current decode path. For example ``defines``
401 for AlgorithmIdentifier of X.509's
402 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
403
404         (
405             (("parameters",), {
406                 id_ecPublicKey: ECParameters(),
407                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
408             }),
409             (("..", "subjectPublicKey"), {
410                 id_rsaEncryption: RSAPublicKey(),
411                 id_GostR3410_2001: OctetString(),
412             }),
413         ),
414
415 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
416 autodecode its parameters inside SPKI's algorithm and its public key
417 itself.
418
419 Following types can be automatically decoded (DEFINED BY):
420
421 * :py:class:`pyderasn.Any`
422 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
423 * :py:class:`pyderasn.OctetString`
424 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
425   ``Any``/``BitString``/``OctetString``-s
426
427 When any of those fields is automatically decoded, then ``.defined``
428 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
429 was defined, ``value`` contains corresponding decoded value. For example
430 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
431
432 .. _defines_by_path_ctx:
433
434 defines_by_path context option
435 ______________________________
436
437 Sometimes you either can not or do not want to explicitly set *defines*
438 in the schema. You can dynamically apply those definitions when calling
439 :py:meth:`pyderasn.Obj.decode` method.
440
441 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
442 value must be sequence of following tuples::
443
444     (decode_path, defines)
445
446 where ``decode_path`` is a tuple holding so-called decode path to the
447 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
448 ``defines``, holding exactly the same value as accepted in its
449 :ref:`keyword argument <defines>`.
450
451 For example, again for CMS, you want to automatically decode
452 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
453 structures it may hold. Also, automatically decode ``controlSequence``
454 of ``PKIResponse``::
455
456     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
457         (
458             ("contentType",),
459             ((("content",), {id_signedData: SignedData()}),),
460         ),
461         (
462             (
463                 "content",
464                 DecodePathDefBy(id_signedData),
465                 "encapContentInfo",
466                 "eContentType",
467             ),
468             ((("eContent",), {
469                 id_cct_PKIData: PKIData(),
470                 id_cct_PKIResponse: PKIResponse(),
471             })),
472         ),
473         (
474             (
475                 "content",
476                 DecodePathDefBy(id_signedData),
477                 "encapContentInfo",
478                 "eContent",
479                 DecodePathDefBy(id_cct_PKIResponse),
480                 "controlSequence",
481                 any,
482                 "attrType",
483             ),
484             ((("attrValues",), {
485                 id_cmc_recipientNonce: RecipientNonce(),
486                 id_cmc_senderNonce: SenderNonce(),
487                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
488                 id_cmc_transactionId: TransactionId(),
489             })),
490         ),
491     )})
492
493 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
494 First function is useful for path construction when some automatic
495 decoding is already done. ``any`` means literally any value it meet --
496 useful for SEQUENCE/SET OF-s.
497
498 .. _bered_ctx:
499
500 BER encoding
501 ------------
502
503 By default PyDERASN accepts only DER encoded data. By default it encodes
504 to DER. But you can optionally enable BER decoding with setting
505 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
506 constructed primitive types should be parsed successfully.
507
508 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
509   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
510   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
511   ``UTCTime``, ``GeneralizedTime`` can contain it.
512 * If object has an indefinite length encoding, then its ``lenindef``
513   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
514   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
515   contain it.
516 * If object has an indefinite length encoded explicit tag, then
517   ``expl_lenindef`` is set to True.
518 * If object has either any of BER-related encoding (explicit tag
519   indefinite length, object's indefinite length, BER-encoding) or any
520   underlying component has that kind of encoding, then ``bered``
521   attribute is set to True. For example SignedData CMS can have
522   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
523   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
524
525 EOC (end-of-contents) token's length is taken in advance in object's
526 value length.
527
528 .. _allow_expl_oob_ctx:
529
530 Allow explicit tag out-of-bound
531 -------------------------------
532
533 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
534 one value, more than one object. If you set ``allow_expl_oob`` context
535 option to True, then no error will be raised and that invalid encoding
536 will be silently further processed. But pay attention that offsets and
537 lengths will be invalid in that case.
538
539 .. warning::
540
541    This option should be used only for skipping some decode errors, just
542    to see the decoded structure somehow.
543
544 .. _streaming:
545
546 Streaming and dealing with huge structures
547 ------------------------------------------
548
549 .. _evgen_mode:
550
551 evgen mode
552 __________
553
554 ASN.1 structures can be huge, they can hold millions of objects inside
555 (for example Certificate Revocation Lists (CRL), holding revocation
556 state for every previously issued X.509 certificate). CACert.org's 8 MiB
557 CRL file takes more than half a gigabyte of memory to hold the decoded
558 structure.
559
560 If you just simply want to check the signature over the ``tbsCertList``,
561 you can create specialized schema with that field represented as
562 OctetString for example::
563
564     class TBSCertListFast(Sequence):
565         schema = (
566             [...]
567             ("revokedCertificates", OctetString(
568                 impl=SequenceOf.tag_default,
569                 optional=True,
570             )),
571             [...]
572         )
573
574 This allows you to quickly decode a few fields and check the signature
575 over the ``tbsCertList`` bytes.
576
577 But how can you get all certificate's serial number from it, after you
578 trust that CRL after signature validation? You can use so called
579 ``evgen`` (event generation) mode, to catch the events/facts of some
580 successful object decoding. Let's use command line capabilities::
581
582     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
583          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
584          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
585          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
586          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
587          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
588          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
589          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
590          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
591     [...]
592         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
593         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
594         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
596         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
597         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
598         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
599         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
600         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
601         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
602     [...]
603     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
604     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
605     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
606       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
607         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
608     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
609     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
610     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
611     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
612         0     [1,4,9145534]  CertificateList SEQUENCE
613
614 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
615 decodes underlying values. It can not tell if SEQUENCE is decoded, so
616 the event of the upper level SEQUENCE is the last one we see.
617 ``version`` field is just a single INTEGER -- it is decoded and event is
618 fired immediately. Then we see that ``algorithm`` and ``parameters``
619 fields are decoded and only after them the ``signature`` SEQUENCE is
620 fired as a successfully decoded. There are 4 events for each revoked
621 certificate entry in that CRL: ``userCertificate`` serial number,
622 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
623 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
624
625 We can do that in our ordinary Python code and understand where we are
626 by looking at deterministically generated decode paths (do not forget
627 about useful ``--print-decode-path`` CLI option). We must use
628 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
629 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
630 obj, tail)`` tuples::
631
632     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
633         if (
634             len(decode_path) == 4 and
635             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
636             decode_path[3] == "userCertificate"
637         ):
638             print("serial number:", int(obj))
639
640 Virtually it does not take any memory except at least needed for single
641 object storage. You can easily use that mode to determine required
642 object ``.offset`` and ``.*len`` to be able to decode it separately, or
643 maybe verify signature upon it just by taking bytes by ``.offset`` and
644 ``.tlvlen``.
645
646 .. _evgen_mode_upto_ctx:
647
648 evgen_mode_upto
649 _______________
650
651 There is full ability to get any kind of data from the CRL in the
652 example above. However it is not too convenient to get the whole
653 ``RevokedCertificate`` structure, that is pretty lightweight and one may
654 do not want to disassemble it. You can use ``evgen_mode_upto``
655 :ref:`ctx <ctx>` option that semantically equals to
656 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
657 mapped to any non-None value. If specified decode path is met, then any
658 subsequent objects won't be decoded in evgen mode. That allows us to
659 parse the CRL above with fully assembled ``RevokedCertificate``::
660
661     for decode_path, obj, _ in CertificateList().decode_evgen(
662         crl_raw,
663         ctx={"evgen_mode_upto": (
664             (("tbsCertList", "revokedCertificates", any), True),
665         )},
666     ):
667         if (
668             len(decode_path) == 3 and
669             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
670         ):
671             print("serial number:", int(obj["userCertificate"]))
672
673 .. note::
674
675    SEQUENCE/SET values with DEFAULT specified are automatically decoded
676    without evgen mode.
677
678 .. _mmap:
679
680 mmap-ed file
681 ____________
682
683 POSIX compliant systems have ``mmap`` syscall, giving ability to work
684 the memory mapped file. You can deal with the file like it was an
685 ordinary binary string, allowing you not to load it to the memory first.
686 Also you can use them as an input for OCTET STRING, taking no Python
687 memory for their storage.
688
689 There is convenient :py:func:`pyderasn.file_mmaped` function that
690 creates read-only memoryview on the file contents::
691
692     with open("huge", "rb") as fd:
693         raw = file_mmaped(fd)
694         obj = Something.decode(raw)
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     with open("result", "wb") as fd:
726         obj.encode_cer(fd.write)
727
728 ::
729
730     buf = io.BytesIO()
731     obj.encode_cer(buf.write)
732
733 If you do not want to create in-memory buffer every time, then you can
734 use :py:func:`pyderasn.encode_cer` function::
735
736     data = encode_cer(obj)
737
738 Remember that CER is **not valid** DER in most cases, so you **have to**
739 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
740 decoding. Also currently there is **no** validation that provided CER is
741 valid one -- you are sure that it has only valid BER encoding.
742
743 .. warning::
744
745    SET OF values can not be streamingly encoded, because they are
746    required to be sorted byte-by-byte. Big SET OF values still will take
747    much memory. Use neither SET nor SET OF values, as modern ASN.1
748    also recommends too.
749
750 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
751 OCTET STRINGs! They will be streamingly copied from underlying file to
752 the buffer using 1 KB chunks.
753
754 Some structures require that some of the elements have to be forcefully
755 DER encoded. For example ``SignedData`` CMS requires you to encode
756 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
757 encode everything else in BER. You can tell any of the structures to be
758 forcefully encoded in DER during CER encoding, by specifying
759 ``der_forced=True`` attribute::
760
761     class Certificate(Sequence):
762         schema = (...)
763         der_forced = True
764
765     class SignedAttributes(SetOf):
766         schema = Attribute()
767         bounds = (1, float("+inf"))
768         der_forced = True
769
770 .. _agg_octet_string:
771
772 agg_octet_string
773 ________________
774
775 In most cases, huge quantity of binary data is stored as OCTET STRING.
776 CER encoding splits it on 1 KB chunks. BER allows splitting on various
777 levels of chunks inclusion::
778
779     SOME STRING[CONSTRUCTED]
780         OCTET STRING[CONSTRUCTED]
781             OCTET STRING[PRIMITIVE]
782                 DATA CHUNK
783             OCTET STRING[PRIMITIVE]
784                 DATA CHUNK
785             OCTET STRING[PRIMITIVE]
786                 DATA CHUNK
787         OCTET STRING[PRIMITIVE]
788             DATA CHUNK
789         OCTET STRING[CONSTRUCTED]
790             OCTET STRING[PRIMITIVE]
791                 DATA CHUNK
792             OCTET STRING[PRIMITIVE]
793                 DATA CHUNK
794         OCTET STRING[CONSTRUCTED]
795             OCTET STRING[CONSTRUCTED]
796                 OCTET STRING[PRIMITIVE]
797                     DATA CHUNK
798
799 You can not just take the offset and some ``.vlen`` of the STRING and
800 treat it as the payload. If you decode it without
801 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
802 and ``bytes()`` will give the whole payload contents.
803
804 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
805 small memory footprint. There is convenient
806 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
807 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
808 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
809 SHA512 digest of its ``eContent``::
810
811     fd = open("data.p7m", "rb")
812     raw = file_mmaped(fd)
813     ctx = {"bered": True}
814     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
815         if decode_path == ("content",):
816             content = obj
817             break
818     else:
819         raise ValueError("no content found")
820     hasher_state = sha512()
821     def hasher(data):
822         hasher_state.update(data)
823         return len(data)
824     evgens = SignedData().decode_evgen(
825         raw[content.offset:],
826         offset=content.offset,
827         ctx=ctx,
828     )
829     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
830     fd.close()
831     digest = hasher_state.digest()
832
833 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
834 copy the payload (without BER/CER encoding interleaved overhead) in it.
835 Virtually it won't take memory more than for keeping small structures
836 and 1 KB binary chunks.
837
838 .. _seqof-iterators:
839
840 SEQUENCE OF iterators
841 _____________________
842
843 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
844 classes. The only difference with providing the full list of objects, is
845 that type and bounds checking is done during encoding process. Also
846 sequence's value will be emptied after encoding, forcing you to set its
847 value again.
848
849 This is very useful when you have to create some huge objects, like
850 CRLs, with thousands and millions of entities inside. You can write the
851 generator taking necessary data from the database and giving the
852 ``RevokedCertificate`` objects. Only binary representation of that
853 objects will take memory during DER encoding.
854
855 2-pass DER encoding
856 -------------------
857
858 There is ability to do 2-pass encoding to DER, writing results directly
859 to specified writer (buffer, file, whatever). It could be 1.5+ times
860 slower than ordinary encoding, but it takes little memory for 1st pass
861 state storing. For example, 1st pass state for CACert.org's CRL with
862 ~416K of certificate entries takes nearly 3.5 MB of memory.
863 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
864 nearly 0.5 KB of memory.
865
866 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
867 iterators <seqof-iterators>` and write directly to opened file, then
868 there is very small memory footprint.
869
870 1st pass traverses through all the objects of the structure and returns
871 the size of DER encoded structure, together with 1st pass state object.
872 That state contains precalculated lengths for various objects inside the
873 structure.
874
875 ::
876
877     fulllen, state = obj.encode1st()
878
879 2nd pass takes the writer and 1st pass state. It traverses through all
880 the objects again, but writes their encoded representation to the writer.
881
882 ::
883
884     with open("result", "wb") as fd:
885         obj.encode2nd(fd.write, iter(state))
886
887 .. warning::
888
889    You **MUST NOT** use 1st pass state if anything is changed in the
890    objects. It is intended to be used immediately after 1st pass is
891    done!
892
893 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
894 have to reinitialize the values after the 1st pass. And you **have to**
895 be sure that the iterator gives exactly the same values as previously.
896 Yes, you have to run your iterator twice -- because this is two pass
897 encoding mode.
898
899 If you want to encode to the memory, then you can use convenient
900 :py:func:`pyderasn.encode2pass` helper.
901
902 .. _browser:
903
904 ASN.1 browser
905 -------------
906 .. autofunction:: pyderasn.browse
907
908 Base Obj
909 --------
910 .. autoclass:: pyderasn.Obj
911    :members:
912
913 Primitive types
914 ---------------
915
916 Boolean
917 _______
918 .. autoclass:: pyderasn.Boolean
919    :members: __init__
920
921 Integer
922 _______
923 .. autoclass:: pyderasn.Integer
924    :members: __init__, named, tohex
925
926 BitString
927 _________
928 .. autoclass:: pyderasn.BitString
929    :members: __init__, bit_len, named
930
931 OctetString
932 ___________
933 .. autoclass:: pyderasn.OctetString
934    :members: __init__
935
936 Null
937 ____
938 .. autoclass:: pyderasn.Null
939    :members: __init__
940
941 ObjectIdentifier
942 ________________
943 .. autoclass:: pyderasn.ObjectIdentifier
944    :members: __init__
945
946 Enumerated
947 __________
948 .. autoclass:: pyderasn.Enumerated
949
950 CommonString
951 ____________
952 .. autoclass:: pyderasn.CommonString
953
954 NumericString
955 _____________
956 .. autoclass:: pyderasn.NumericString
957
958 PrintableString
959 _______________
960 .. autoclass:: pyderasn.PrintableString
961    :members: __init__, allow_asterisk, allow_ampersand
962
963 IA5String
964 _________
965 .. autoclass:: pyderasn.IA5String
966
967 VisibleString
968 _____________
969 .. autoclass:: pyderasn.VisibleString
970
971 UTCTime
972 _______
973 .. autoclass:: pyderasn.UTCTime
974    :members: __init__, todatetime
975
976 GeneralizedTime
977 _______________
978 .. autoclass:: pyderasn.GeneralizedTime
979    :members: __init__, todatetime
980
981 Special types
982 -------------
983
984 Choice
985 ______
986 .. autoclass:: pyderasn.Choice
987    :members: __init__, choice, value
988
989 PrimitiveTypes
990 ______________
991 .. autoclass:: PrimitiveTypes
992
993 Any
994 ___
995 .. autoclass:: pyderasn.Any
996    :members: __init__
997
998 Constructed types
999 -----------------
1000
1001 Sequence
1002 ________
1003 .. autoclass:: pyderasn.Sequence
1004    :members: __init__
1005
1006 Set
1007 ___
1008 .. autoclass:: pyderasn.Set
1009    :members: __init__
1010
1011 SequenceOf
1012 __________
1013 .. autoclass:: pyderasn.SequenceOf
1014    :members: __init__
1015
1016 SetOf
1017 _____
1018 .. autoclass:: pyderasn.SetOf
1019    :members: __init__
1020
1021 Various
1022 -------
1023
1024 .. autofunction:: pyderasn.abs_decode_path
1025 .. autofunction:: pyderasn.agg_octet_string
1026 .. autofunction:: pyderasn.ascii_visualize
1027 .. autofunction:: pyderasn.colonize_hex
1028 .. autofunction:: pyderasn.encode2pass
1029 .. autofunction:: pyderasn.encode_cer
1030 .. autofunction:: pyderasn.file_mmaped
1031 .. autofunction:: pyderasn.hexenc
1032 .. autofunction:: pyderasn.hexdec
1033 .. autofunction:: pyderasn.hexdump
1034 .. autofunction:: pyderasn.tag_encode
1035 .. autofunction:: pyderasn.tag_decode
1036 .. autofunction:: pyderasn.tag_ctxp
1037 .. autofunction:: pyderasn.tag_ctxc
1038 .. autoclass:: pyderasn.DecodeError
1039    :members: __init__
1040 .. autoclass:: pyderasn.NotEnoughData
1041 .. autoclass:: pyderasn.ExceedingData
1042 .. autoclass:: pyderasn.LenIndefForm
1043 .. autoclass:: pyderasn.TagMismatch
1044 .. autoclass:: pyderasn.InvalidLength
1045 .. autoclass:: pyderasn.InvalidOID
1046 .. autoclass:: pyderasn.ObjUnknown
1047 .. autoclass:: pyderasn.ObjNotReady
1048 .. autoclass:: pyderasn.InvalidValueType
1049 .. autoclass:: pyderasn.BoundsError
1050
1051 .. _cmdline:
1052
1053 Command-line usage
1054 ------------------
1055
1056 You can decode DER/BER files using command line abilities::
1057
1058     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1059
1060 If there is no schema for your file, then you can try parsing it without,
1061 but of course IMPLICIT tags will often make it impossible. But result is
1062 good enough for the certificate above::
1063
1064     $ python -m pyderasn path/to/file
1065         0   [1,3,1604]  . >: SEQUENCE OF
1066         4   [1,3,1453]  . . >: SEQUENCE OF
1067         8   [0,0,   5]  . . . . >: [0] ANY
1068                         . . . . . A0:03:02:01:02
1069        13   [1,1,   3]  . . . . >: INTEGER 61595
1070        18   [1,1,  13]  . . . . >: SEQUENCE OF
1071        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1072        31   [1,1,   0]  . . . . . . >: NULL
1073        33   [1,3, 274]  . . . . >: SEQUENCE OF
1074        37   [1,1,  11]  . . . . . . >: SET OF
1075        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1076        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1077        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1078     [...]
1079      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1080      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1081      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1082                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1083                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1084                         . . . . . . . . . 61:2E:63:6F:6D:2F
1085      1461   [1,1,  13]  . . >: SEQUENCE OF
1086      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1087      1474   [1,1,   0]  . . . . >: NULL
1088      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1089                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1090                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1091     [...]
1092
1093 Human readable OIDs
1094 ___________________
1095
1096 If you have got dictionaries with ObjectIdentifiers, like example one
1097 from ``tests/test_crts.py``::
1098
1099     stroid2name = {
1100         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1101         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1102         [...]
1103         "2.5.4.10": "id-at-organizationName",
1104         "2.5.4.11": "id-at-organizationalUnitName",
1105     }
1106
1107 then you can pass it to pretty printer to see human readable OIDs::
1108
1109     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1110     [...]
1111        37   [1,1,  11]  . . . . . . >: SET OF
1112        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1113        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1114        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1115        50   [1,1,  18]  . . . . . . >: SET OF
1116        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1117        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1118        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1119        70   [1,1,  18]  . . . . . . >: SET OF
1120        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1121        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1122        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1123     [...]
1124
1125 Decode paths
1126 ____________
1127
1128 Each decoded element has so-called decode path: sequence of structure
1129 names it is passing during the decode process. Each element has its own
1130 unique path inside the whole ASN.1 tree. You can print it out with
1131 ``--print-decode-path`` option::
1132
1133     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1134        0    [1,3,1604]  Certificate SEQUENCE []
1135        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1136       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1137       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1138       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1139       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1140       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1141                          . . . . 05:00
1142       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1143       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1144       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1145       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1146       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1147       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1148                          . . . . . . . 13:02:45:53
1149       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1150     [...]
1151
1152 Now you can print only the specified tree, for example signature algorithm::
1153
1154     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1155       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1156       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1157       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1158                          . . 05:00
1159 """
1160
1161 from array import array
1162 from collections import namedtuple
1163 from collections import OrderedDict
1164 from copy import copy
1165 from datetime import datetime
1166 from datetime import timedelta
1167 from io import BytesIO
1168 from math import ceil
1169 from operator import attrgetter
1170 from string import ascii_letters
1171 from string import digits
1172 from struct import Struct as struct_Struct
1173 from sys import maxsize as sys_maxsize
1174 from sys import version_info
1175 from unicodedata import category as unicat
1176
1177 try:
1178     from termcolor import colored
1179 except ImportError:  # pragma: no cover
1180     def colored(what, *args, **kwargs):
1181         return what
1182
1183 __version__ = "9.0"
1184
1185 __all__ = (
1186     "agg_octet_string",
1187     "Any",
1188     "BitString",
1189     "BMPString",
1190     "Boolean",
1191     "BoundsError",
1192     "Choice",
1193     "colonize_hex",
1194     "DecodeError",
1195     "DecodePathDefBy",
1196     "encode2pass",
1197     "encode_cer",
1198     "Enumerated",
1199     "ExceedingData",
1200     "file_mmaped",
1201     "GeneralizedTime",
1202     "GeneralString",
1203     "GraphicString",
1204     "hexdec",
1205     "hexenc",
1206     "IA5String",
1207     "Integer",
1208     "InvalidLength",
1209     "InvalidOID",
1210     "InvalidValueType",
1211     "ISO646String",
1212     "LenIndefForm",
1213     "NotEnoughData",
1214     "Null",
1215     "NumericString",
1216     "obj_by_path",
1217     "ObjectIdentifier",
1218     "ObjNotReady",
1219     "ObjUnknown",
1220     "OctetString",
1221     "PrimitiveTypes",
1222     "PrintableString",
1223     "Sequence",
1224     "SequenceOf",
1225     "Set",
1226     "SetOf",
1227     "T61String",
1228     "tag_ctxc",
1229     "tag_ctxp",
1230     "tag_decode",
1231     "TagClassApplication",
1232     "TagClassContext",
1233     "TagClassPrivate",
1234     "TagClassUniversal",
1235     "TagFormConstructed",
1236     "TagFormPrimitive",
1237     "TagMismatch",
1238     "TeletexString",
1239     "UniversalString",
1240     "UTCTime",
1241     "UTF8String",
1242     "VideotexString",
1243     "VisibleString",
1244 )
1245
1246 TagClassUniversal = 0
1247 TagClassApplication = 1 << 6
1248 TagClassContext = 1 << 7
1249 TagClassPrivate = 1 << 6 | 1 << 7
1250 TagFormPrimitive = 0
1251 TagFormConstructed = 1 << 5
1252 TagClassReprs = {
1253     TagClassContext: "",
1254     TagClassApplication: "APPLICATION ",
1255     TagClassPrivate: "PRIVATE ",
1256     TagClassUniversal: "UNIV ",
1257 }
1258 EOC = b"\x00\x00"
1259 EOC_LEN = len(EOC)
1260 LENINDEF = b"\x80"  # length indefinite mark
1261 LENINDEF_PP_CHAR = "∞"
1262 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1263 SET01 = frozenset("01")
1264 DECIMALS = frozenset(digits)
1265 DECIMAL_SIGNS = ".,"
1266 NEXT_ATTR_NAME = "__next__"
1267
1268
1269 def file_mmaped(fd):
1270     """Make mmap-ed memoryview for reading from file
1271
1272     :param fd: file object
1273     :returns: memoryview over read-only mmap-ing of the whole file
1274
1275     .. warning::
1276
1277        It does not work under Windows.
1278     """
1279     import mmap
1280     return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ))
1281
1282
1283 def pureint(value):
1284     if not set(value) <= DECIMALS:
1285         raise ValueError("non-pure integer")
1286     return int(value)
1287
1288
1289 def fractions2float(fractions_raw):
1290     pureint(fractions_raw)
1291     return float("0." + fractions_raw)
1292
1293
1294 def get_def_by_path(defines_by_path, sub_decode_path):
1295     """Get define by decode path
1296     """
1297     for path, define in defines_by_path:
1298         if len(path) != len(sub_decode_path):
1299             continue
1300         for p1, p2 in zip(path, sub_decode_path):
1301             if (p1 is not any) and (p1 != p2):
1302                 break
1303         else:
1304             return define
1305
1306
1307 ########################################################################
1308 # Errors
1309 ########################################################################
1310
1311 class ASN1Error(ValueError):
1312     pass
1313
1314
1315 class DecodeError(ASN1Error):
1316     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1317         """
1318         :param str msg: reason of decode failing
1319         :param klass: optional exact DecodeError inherited class (like
1320                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1321                       :py:exc:`InvalidLength`)
1322         :param decode_path: tuple of strings. It contains human
1323                             readable names of the fields through which
1324                             decoding process has passed
1325         :param int offset: binary offset where failure happened
1326         """
1327         super().__init__()
1328         self.msg = msg
1329         self.klass = klass
1330         self.decode_path = decode_path
1331         self.offset = offset
1332
1333     def __str__(self):
1334         return " ".join(
1335             c for c in (
1336                 "" if self.klass is None else self.klass.__name__,
1337                 (
1338                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1339                     if len(self.decode_path) > 0 else ""
1340                 ),
1341                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1342                 self.msg,
1343             ) if c != ""
1344         )
1345
1346     def __repr__(self):
1347         return "%s(%s)" % (self.__class__.__name__, self)
1348
1349
1350 class NotEnoughData(DecodeError):
1351     pass
1352
1353
1354 class ExceedingData(ASN1Error):
1355     def __init__(self, nbytes):
1356         super().__init__()
1357         self.nbytes = nbytes
1358
1359     def __str__(self):
1360         return "%d trailing bytes" % self.nbytes
1361
1362     def __repr__(self):
1363         return "%s(%s)" % (self.__class__.__name__, self)
1364
1365
1366 class LenIndefForm(DecodeError):
1367     pass
1368
1369
1370 class TagMismatch(DecodeError):
1371     pass
1372
1373
1374 class InvalidLength(DecodeError):
1375     pass
1376
1377
1378 class InvalidOID(DecodeError):
1379     pass
1380
1381
1382 class ObjUnknown(ASN1Error):
1383     def __init__(self, name):
1384         super().__init__()
1385         self.name = name
1386
1387     def __str__(self):
1388         return "object is unknown: %s" % self.name
1389
1390     def __repr__(self):
1391         return "%s(%s)" % (self.__class__.__name__, self)
1392
1393
1394 class ObjNotReady(ASN1Error):
1395     def __init__(self, name):
1396         super().__init__()
1397         self.name = name
1398
1399     def __str__(self):
1400         return "object is not ready: %s" % self.name
1401
1402     def __repr__(self):
1403         return "%s(%s)" % (self.__class__.__name__, self)
1404
1405
1406 class InvalidValueType(ASN1Error):
1407     def __init__(self, expected_types):
1408         super().__init__()
1409         self.expected_types = expected_types
1410
1411     def __str__(self):
1412         return "invalid value type, expected: %s" % ", ".join(
1413             [repr(t) for t in self.expected_types]
1414         )
1415
1416     def __repr__(self):
1417         return "%s(%s)" % (self.__class__.__name__, self)
1418
1419
1420 class BoundsError(ASN1Error):
1421     def __init__(self, bound_min, value, bound_max):
1422         super().__init__()
1423         self.bound_min = bound_min
1424         self.value = value
1425         self.bound_max = bound_max
1426
1427     def __str__(self):
1428         return "unsatisfied bounds: %s <= %s <= %s" % (
1429             self.bound_min,
1430             self.value,
1431             self.bound_max,
1432         )
1433
1434     def __repr__(self):
1435         return "%s(%s)" % (self.__class__.__name__, self)
1436
1437
1438 ########################################################################
1439 # Basic coders
1440 ########################################################################
1441
1442 def hexdec(data):
1443     """Binary data to hexadecimal string convert
1444     """
1445     return bytes.fromhex(data)
1446
1447
1448 def hexenc(data):
1449     """Hexadecimal string to binary data convert
1450     """
1451     return data.hex()
1452
1453
1454 def int_bytes_len(num, byte_len=8):
1455     if num == 0:
1456         return 1
1457     return int(ceil(float(num.bit_length()) / byte_len))
1458
1459
1460 def zero_ended_encode(num):
1461     octets = bytearray(int_bytes_len(num, 7))
1462     i = len(octets) - 1
1463     octets[i] = num & 0x7F
1464     num >>= 7
1465     i -= 1
1466     while num > 0:
1467         octets[i] = 0x80 | (num & 0x7F)
1468         num >>= 7
1469         i -= 1
1470     return bytes(octets)
1471
1472
1473 int2byte = struct_Struct(">B").pack
1474
1475
1476 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1477     """Encode tag to binary form
1478
1479     :param int num: tag's number
1480     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1481                       :py:data:`pyderasn.TagClassContext`,
1482                       :py:data:`pyderasn.TagClassApplication`,
1483                       :py:data:`pyderasn.TagClassPrivate`)
1484     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1485                      :py:data:`pyderasn.TagFormConstructed`)
1486     """
1487     if num < 31:
1488         # [XX|X|.....]
1489         return int2byte(klass | form | num)
1490     # [XX|X|11111][1.......][1.......] ... [0.......]
1491     return int2byte(klass | form | 31) + zero_ended_encode(num)
1492
1493
1494 def tag_decode(tag):
1495     """Decode tag from binary form
1496
1497     .. warning::
1498
1499        No validation is performed, assuming that it has already passed.
1500
1501     It returns tuple with three integers, as
1502     :py:func:`pyderasn.tag_encode` accepts.
1503     """
1504     first_octet = tag[0]
1505     klass = first_octet & 0xC0
1506     form = first_octet & 0x20
1507     if first_octet & 0x1F < 0x1F:
1508         return (klass, form, first_octet & 0x1F)
1509     num = 0
1510     for octet in tag[1:]:
1511         num <<= 7
1512         num |= octet & 0x7F
1513     return (klass, form, num)
1514
1515
1516 def tag_ctxp(num):
1517     """Create CONTEXT PRIMITIVE tag
1518     """
1519     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1520
1521
1522 def tag_ctxc(num):
1523     """Create CONTEXT CONSTRUCTED tag
1524     """
1525     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1526
1527
1528 def tag_strip(data):
1529     """Take off tag from the data
1530
1531     :returns: (encoded tag, tag length, remaining data)
1532     """
1533     if len(data) == 0:
1534         raise NotEnoughData("no data at all")
1535     if data[0] & 0x1F < 31:
1536         return data[:1], 1, data[1:]
1537     i = 0
1538     while True:
1539         i += 1
1540         if i == len(data):
1541             raise DecodeError("unfinished tag")
1542         if data[i] & 0x80 == 0:
1543             break
1544     if i == 1 and data[1] < 0x1F:
1545         raise DecodeError("unexpected long form")
1546     if i > 1 and data[1] & 0x7F == 0:
1547         raise DecodeError("leading zero byte in tag value")
1548     i += 1
1549     return data[:i], i, data[i:]
1550
1551
1552 def len_encode(l):
1553     if l < 0x80:
1554         return int2byte(l)
1555     octets = bytearray(int_bytes_len(l) + 1)
1556     octets[0] = 0x80 | (len(octets) - 1)
1557     for i in range(len(octets) - 1, 0, -1):
1558         octets[i] = l & 0xFF
1559         l >>= 8
1560     return bytes(octets)
1561
1562
1563 def len_decode(data):
1564     """Decode length
1565
1566     :returns: (decoded length, length's length, remaining data)
1567     :raises LenIndefForm: if indefinite form encoding is met
1568     """
1569     if len(data) == 0:
1570         raise NotEnoughData("no data at all")
1571     first_octet = data[0]
1572     if first_octet & 0x80 == 0:
1573         return first_octet, 1, data[1:]
1574     octets_num = first_octet & 0x7F
1575     if octets_num + 1 > len(data):
1576         raise NotEnoughData("encoded length is longer than data")
1577     if octets_num == 0:
1578         raise LenIndefForm()
1579     if data[1] == 0:
1580         raise DecodeError("leading zeros")
1581     l = 0
1582     for v in data[1:1 + octets_num]:
1583         l = (l << 8) | v
1584     if l <= 127:
1585         raise DecodeError("long form instead of short one")
1586     return l, 1 + octets_num, data[1 + octets_num:]
1587
1588
1589 LEN0 = len_encode(0)
1590 LEN1 = len_encode(1)
1591 LEN1K = len_encode(1000)
1592
1593
1594 def len_size(l):
1595     """How many bytes length field will take
1596     """
1597     if l < 128:
1598         return 1
1599     if l < 256:  # 1 << 8
1600         return 2
1601     if l < 65536:  # 1 << 16
1602         return 3
1603     if l < 16777216:  # 1 << 24
1604         return 4
1605     if l < 4294967296:  # 1 << 32
1606         return 5
1607     if l < 1099511627776:  # 1 << 40
1608         return 6
1609     if l < 281474976710656:  # 1 << 48
1610         return 7
1611     if l < 72057594037927936:  # 1 << 56
1612         return 8
1613     raise OverflowError("too big length")
1614
1615
1616 def write_full(writer, data):
1617     """Fully write provided data
1618
1619     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1620
1621     BytesIO does not guarantee that the whole data will be written at
1622     once. That function write everything provided, raising an error if
1623     ``writer`` returns None.
1624     """
1625     data = memoryview(data)
1626     written = 0
1627     while written != len(data):
1628         n = writer(data[written:])
1629         if n is None:
1630             raise ValueError("can not write to buf")
1631         written += n
1632
1633
1634 # If it is 64-bit system, then use compact 64-bit array of unsigned
1635 # longs. Use an ordinary list with universal integers otherwise, that
1636 # is slower.
1637 if sys_maxsize > 2 ** 32:
1638     def state_2pass_new():
1639         return array("L")
1640 else:
1641     def state_2pass_new():
1642         return []
1643
1644
1645 ########################################################################
1646 # Base class
1647 ########################################################################
1648
1649 class AutoAddSlots(type):
1650     def __new__(cls, name, bases, _dict):
1651         _dict["__slots__"] = _dict.get("__slots__", ())
1652         return super().__new__(cls, name, bases, _dict)
1653
1654
1655 BasicState = namedtuple("BasicState", (
1656     "version",
1657     "tag",
1658     "tag_order",
1659     "expl",
1660     "default",
1661     "optional",
1662     "offset",
1663     "llen",
1664     "vlen",
1665     "expl_lenindef",
1666     "lenindef",
1667     "ber_encoded",
1668 ), **NAMEDTUPLE_KWARGS)
1669
1670
1671 class Obj(metaclass=AutoAddSlots):
1672     """Common ASN.1 object class
1673
1674     All ASN.1 types are inherited from it. It has metaclass that
1675     automatically adds ``__slots__`` to all inherited classes.
1676     """
1677     __slots__ = (
1678         "tag",
1679         "_tag_order",
1680         "_value",
1681         "_expl",
1682         "default",
1683         "optional",
1684         "offset",
1685         "llen",
1686         "vlen",
1687         "expl_lenindef",
1688         "lenindef",
1689         "ber_encoded",
1690     )
1691
1692     def __init__(
1693             self,
1694             impl=None,
1695             expl=None,
1696             default=None,
1697             optional=False,
1698             _decoded=(0, 0, 0),
1699     ):
1700         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1701         self._expl = getattr(self, "expl", None) if expl is None else expl
1702         if self.tag != self.tag_default and self._expl is not None:
1703             raise ValueError("implicit and explicit tags can not be set simultaneously")
1704         if self.tag is None:
1705             self._tag_order = None
1706         else:
1707             tag_class, _, tag_num = tag_decode(
1708                 self.tag if self._expl is None else self._expl
1709             )
1710             self._tag_order = (tag_class, tag_num)
1711         if default is not None:
1712             optional = True
1713         self.optional = optional
1714         self.offset, self.llen, self.vlen = _decoded
1715         self.default = None
1716         self.expl_lenindef = False
1717         self.lenindef = False
1718         self.ber_encoded = False
1719
1720     @property
1721     def ready(self):  # pragma: no cover
1722         """Is object ready to be encoded?
1723         """
1724         raise NotImplementedError()
1725
1726     def _assert_ready(self):
1727         if not self.ready:
1728             raise ObjNotReady(self.__class__.__name__)
1729
1730     @property
1731     def bered(self):
1732         """Is either object or any elements inside is BER encoded?
1733         """
1734         return self.expl_lenindef or self.lenindef or self.ber_encoded
1735
1736     @property
1737     def decoded(self):
1738         """Is object decoded?
1739         """
1740         return (self.llen + self.vlen) > 0
1741
1742     def __getstate__(self):  # pragma: no cover
1743         """Used for making safe to be mutable pickleable copies
1744         """
1745         raise NotImplementedError()
1746
1747     def __setstate__(self, state):
1748         if state.version != __version__:
1749             raise ValueError("data is pickled by different PyDERASN version")
1750         self.tag = state.tag
1751         self._tag_order = state.tag_order
1752         self._expl = state.expl
1753         self.default = state.default
1754         self.optional = state.optional
1755         self.offset = state.offset
1756         self.llen = state.llen
1757         self.vlen = state.vlen
1758         self.expl_lenindef = state.expl_lenindef
1759         self.lenindef = state.lenindef
1760         self.ber_encoded = state.ber_encoded
1761
1762     @property
1763     def tag_order(self):
1764         """Tag's (class, number) used for DER/CER sorting
1765         """
1766         return self._tag_order
1767
1768     @property
1769     def tag_order_cer(self):
1770         return self.tag_order
1771
1772     @property
1773     def tlen(self):
1774         """.. seealso:: :ref:`decoding`
1775         """
1776         return len(self.tag)
1777
1778     @property
1779     def tlvlen(self):
1780         """.. seealso:: :ref:`decoding`
1781         """
1782         return self.tlen + self.llen + self.vlen
1783
1784     def __str__(self):  # pragma: no cover
1785         return self.__unicode__()
1786
1787     def __ne__(self, their):
1788         return not(self == their)
1789
1790     def __gt__(self, their):  # pragma: no cover
1791         return not(self < their)
1792
1793     def __le__(self, their):  # pragma: no cover
1794         return (self == their) or (self < their)
1795
1796     def __ge__(self, their):  # pragma: no cover
1797         return (self == their) or (self > their)
1798
1799     def _encode(self):  # pragma: no cover
1800         raise NotImplementedError()
1801
1802     def _encode_cer(self, writer):
1803         write_full(writer, self._encode())
1804
1805     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1806         yield NotImplemented
1807
1808     def _encode1st(self, state):
1809         raise NotImplementedError()
1810
1811     def _encode2nd(self, writer, state_iter):
1812         raise NotImplementedError()
1813
1814     def encode(self):
1815         """DER encode the structure
1816
1817         :returns: DER representation
1818         """
1819         raw = self._encode()
1820         if self._expl is None:
1821             return raw
1822         return b"".join((self._expl, len_encode(len(raw)), raw))
1823
1824     def encode1st(self, state=None):
1825         """Do the 1st pass of 2-pass encoding
1826
1827         :rtype: (int, array("L"))
1828         :returns: full length of encoded data and precalculated various
1829                   objects lengths
1830         """
1831         if state is None:
1832             state = state_2pass_new()
1833         if self._expl is None:
1834             return self._encode1st(state)
1835         state.append(0)
1836         idx = len(state) - 1
1837         vlen, _ = self._encode1st(state)
1838         state[idx] = vlen
1839         fulllen = len(self._expl) + len_size(vlen) + vlen
1840         return fulllen, state
1841
1842     def encode2nd(self, writer, state_iter):
1843         """Do the 2nd pass of 2-pass encoding
1844
1845         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1846         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1847         """
1848         if self._expl is None:
1849             self._encode2nd(writer, state_iter)
1850         else:
1851             write_full(writer, self._expl + len_encode(next(state_iter)))
1852             self._encode2nd(writer, state_iter)
1853
1854     def encode_cer(self, writer):
1855         """CER encode the structure to specified writer
1856
1857         :param writer: must comply with ``io.RawIOBase.write``
1858                        behaviour. It takes slice to be written and
1859                        returns number of bytes processed. If it returns
1860                        None, then exception will be raised
1861         """
1862         if self._expl is not None:
1863             write_full(writer, self._expl + LENINDEF)
1864         if getattr(self, "der_forced", False):
1865             write_full(writer, self._encode())
1866         else:
1867             self._encode_cer(writer)
1868         if self._expl is not None:
1869             write_full(writer, EOC)
1870
1871     def hexencode(self):
1872         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1873         """
1874         return hexenc(self.encode())
1875
1876     def decode(
1877             self,
1878             data,
1879             offset=0,
1880             leavemm=False,
1881             decode_path=(),
1882             ctx=None,
1883             tag_only=False,
1884             _ctx_immutable=True,
1885     ):
1886         """Decode the data
1887
1888         :param data: either binary or memoryview
1889         :param int offset: initial data's offset
1890         :param bool leavemm: do we need to leave memoryview of remaining
1891                     data as is, or convert it to bytes otherwise
1892         :param decode_path: current decode path (tuples of strings,
1893                             possibly with DecodePathDefBy) with will be
1894                             the root for all underlying objects
1895         :param ctx: optional :ref:`context <ctx>` governing decoding process
1896         :param bool tag_only: decode only the tag, without length and
1897                               contents (used only in Choice and Set
1898                               structures, trying to determine if tag satisfies
1899                               the schema)
1900         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1901                                     before using it?
1902         :returns: (Obj, remaining data)
1903
1904         .. seealso:: :ref:`decoding`
1905         """
1906         result = next(self.decode_evgen(
1907             data,
1908             offset,
1909             leavemm,
1910             decode_path,
1911             ctx,
1912             tag_only,
1913             _ctx_immutable,
1914             _evgen_mode=False,
1915         ))
1916         if result is None:
1917             return None
1918         _, obj, tail = result
1919         return obj, tail
1920
1921     def decode_evgen(
1922             self,
1923             data,
1924             offset=0,
1925             leavemm=False,
1926             decode_path=(),
1927             ctx=None,
1928             tag_only=False,
1929             _ctx_immutable=True,
1930             _evgen_mode=True,
1931     ):
1932         """Decode with evgen mode on
1933
1934         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1935         it returns the generator producing ``(decode_path, obj, tail)``
1936         values.
1937         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1938         """
1939         if ctx is None:
1940             ctx = {}
1941         elif _ctx_immutable:
1942             ctx = copy(ctx)
1943         tlv = memoryview(data)
1944         if (
1945                 _evgen_mode and
1946                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1947         ):
1948             _evgen_mode = False
1949         if self._expl is None:
1950             for result in self._decode(
1951                     tlv,
1952                     offset=offset,
1953                     decode_path=decode_path,
1954                     ctx=ctx,
1955                     tag_only=tag_only,
1956                     evgen_mode=_evgen_mode,
1957             ):
1958                 if tag_only:
1959                     yield None
1960                     return
1961                 _decode_path, obj, tail = result
1962                 if _decode_path is not decode_path:
1963                     yield result
1964         else:
1965             try:
1966                 t, tlen, lv = tag_strip(tlv)
1967             except DecodeError as err:
1968                 raise err.__class__(
1969                     msg=err.msg,
1970                     klass=self.__class__,
1971                     decode_path=decode_path,
1972                     offset=offset,
1973                 )
1974             if t != self._expl:
1975                 raise TagMismatch(
1976                     klass=self.__class__,
1977                     decode_path=decode_path,
1978                     offset=offset,
1979                 )
1980             try:
1981                 l, llen, v = len_decode(lv)
1982             except LenIndefForm as err:
1983                 if not ctx.get("bered", False):
1984                     raise err.__class__(
1985                         msg=err.msg,
1986                         klass=self.__class__,
1987                         decode_path=decode_path,
1988                         offset=offset,
1989                     )
1990                 llen, v = 1, lv[1:]
1991                 offset += tlen + llen
1992                 for result in self._decode(
1993                         v,
1994                         offset=offset,
1995                         decode_path=decode_path,
1996                         ctx=ctx,
1997                         tag_only=tag_only,
1998                         evgen_mode=_evgen_mode,
1999                 ):
2000                     if tag_only:  # pragma: no cover
2001                         yield None
2002                         return
2003                     _decode_path, obj, tail = result
2004                     if _decode_path is not decode_path:
2005                         yield result
2006                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2007                 if eoc_expected.tobytes() != EOC:
2008                     raise DecodeError(
2009                         "no EOC",
2010                         klass=self.__class__,
2011                         decode_path=decode_path,
2012                         offset=offset,
2013                     )
2014                 obj.vlen += EOC_LEN
2015                 obj.expl_lenindef = True
2016             except DecodeError as err:
2017                 raise err.__class__(
2018                     msg=err.msg,
2019                     klass=self.__class__,
2020                     decode_path=decode_path,
2021                     offset=offset,
2022                 )
2023             else:
2024                 if l > len(v):
2025                     raise NotEnoughData(
2026                         "encoded length is longer than data",
2027                         klass=self.__class__,
2028                         decode_path=decode_path,
2029                         offset=offset,
2030                     )
2031                 for result in self._decode(
2032                         v,
2033                         offset=offset + tlen + llen,
2034                         decode_path=decode_path,
2035                         ctx=ctx,
2036                         tag_only=tag_only,
2037                         evgen_mode=_evgen_mode,
2038                 ):
2039                     if tag_only:  # pragma: no cover
2040                         yield None
2041                         return
2042                     _decode_path, obj, tail = result
2043                     if _decode_path is not decode_path:
2044                         yield result
2045                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2046                     raise DecodeError(
2047                         "explicit tag out-of-bound, longer than data",
2048                         klass=self.__class__,
2049                         decode_path=decode_path,
2050                         offset=offset,
2051                     )
2052         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2053
2054     def decod(self, data, offset=0, decode_path=(), ctx=None):
2055         """Decode the data, check that tail is empty
2056
2057         :raises ExceedingData: if tail is not empty
2058
2059         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2060         (decode without tail) that also checks that there is no
2061         trailing data left.
2062         """
2063         obj, tail = self.decode(
2064             data,
2065             offset=offset,
2066             decode_path=decode_path,
2067             ctx=ctx,
2068             leavemm=True,
2069         )
2070         if len(tail) > 0:
2071             raise ExceedingData(len(tail))
2072         return obj
2073
2074     def hexdecode(self, data, *args, **kwargs):
2075         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2076         """
2077         return self.decode(hexdec(data), *args, **kwargs)
2078
2079     def hexdecod(self, data, *args, **kwargs):
2080         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2081         """
2082         return self.decod(hexdec(data), *args, **kwargs)
2083
2084     @property
2085     def expled(self):
2086         """.. seealso:: :ref:`decoding`
2087         """
2088         return self._expl is not None
2089
2090     @property
2091     def expl_tag(self):
2092         """.. seealso:: :ref:`decoding`
2093         """
2094         return self._expl
2095
2096     @property
2097     def expl_tlen(self):
2098         """.. seealso:: :ref:`decoding`
2099         """
2100         return len(self._expl)
2101
2102     @property
2103     def expl_llen(self):
2104         """.. seealso:: :ref:`decoding`
2105         """
2106         if self.expl_lenindef:
2107             return 1
2108         return len(len_encode(self.tlvlen))
2109
2110     @property
2111     def expl_offset(self):
2112         """.. seealso:: :ref:`decoding`
2113         """
2114         return self.offset - self.expl_tlen - self.expl_llen
2115
2116     @property
2117     def expl_vlen(self):
2118         """.. seealso:: :ref:`decoding`
2119         """
2120         return self.tlvlen
2121
2122     @property
2123     def expl_tlvlen(self):
2124         """.. seealso:: :ref:`decoding`
2125         """
2126         return self.expl_tlen + self.expl_llen + self.expl_vlen
2127
2128     @property
2129     def fulloffset(self):
2130         """.. seealso:: :ref:`decoding`
2131         """
2132         return self.expl_offset if self.expled else self.offset
2133
2134     @property
2135     def fulllen(self):
2136         """.. seealso:: :ref:`decoding`
2137         """
2138         return self.expl_tlvlen if self.expled else self.tlvlen
2139
2140     def pps_lenindef(self, decode_path):
2141         if self.lenindef and not (
2142                 getattr(self, "defined", None) is not None and
2143                 self.defined[1].lenindef
2144         ):
2145             yield _pp(
2146                 asn1_type_name="EOC",
2147                 obj_name="",
2148                 decode_path=decode_path,
2149                 offset=(
2150                     self.offset + self.tlvlen -
2151                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2152                 ),
2153                 tlen=1,
2154                 llen=1,
2155                 vlen=0,
2156                 ber_encoded=True,
2157                 bered=True,
2158             )
2159         if self.expl_lenindef:
2160             yield _pp(
2161                 asn1_type_name="EOC",
2162                 obj_name="EXPLICIT",
2163                 decode_path=decode_path,
2164                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2165                 tlen=1,
2166                 llen=1,
2167                 vlen=0,
2168                 ber_encoded=True,
2169                 bered=True,
2170             )
2171
2172
2173 def encode_cer(obj):
2174     """Encode to CER in memory buffer
2175
2176     :returns bytes: memory buffer contents
2177     """
2178     buf = BytesIO()
2179     obj.encode_cer(buf.write)
2180     return buf.getvalue()
2181
2182
2183 def encode2pass(obj):
2184     """Encode (2-pass mode) to DER in memory buffer
2185
2186     :returns bytes: memory buffer contents
2187     """
2188     buf = BytesIO()
2189     _, state = obj.encode1st()
2190     obj.encode2nd(buf.write, iter(state))
2191     return buf.getvalue()
2192
2193
2194 class DecodePathDefBy:
2195     """DEFINED BY representation inside decode path
2196     """
2197     __slots__ = ("defined_by",)
2198
2199     def __init__(self, defined_by):
2200         self.defined_by = defined_by
2201
2202     def __ne__(self, their):
2203         return not(self == their)
2204
2205     def __eq__(self, their):
2206         if not isinstance(their, self.__class__):
2207             return False
2208         return self.defined_by == their.defined_by
2209
2210     def __str__(self):
2211         return "DEFINED BY " + str(self.defined_by)
2212
2213     def __repr__(self):
2214         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2215
2216
2217 ########################################################################
2218 # Pretty printing
2219 ########################################################################
2220
2221 PP = namedtuple("PP", (
2222     "obj",
2223     "asn1_type_name",
2224     "obj_name",
2225     "decode_path",
2226     "value",
2227     "blob",
2228     "optional",
2229     "default",
2230     "impl",
2231     "expl",
2232     "offset",
2233     "tlen",
2234     "llen",
2235     "vlen",
2236     "expl_offset",
2237     "expl_tlen",
2238     "expl_llen",
2239     "expl_vlen",
2240     "expl_lenindef",
2241     "lenindef",
2242     "ber_encoded",
2243     "bered",
2244 ), **NAMEDTUPLE_KWARGS)
2245
2246
2247 def _pp(
2248         obj=None,
2249         asn1_type_name="unknown",
2250         obj_name="unknown",
2251         decode_path=(),
2252         value=None,
2253         blob=None,
2254         optional=False,
2255         default=False,
2256         impl=None,
2257         expl=None,
2258         offset=0,
2259         tlen=0,
2260         llen=0,
2261         vlen=0,
2262         expl_offset=None,
2263         expl_tlen=None,
2264         expl_llen=None,
2265         expl_vlen=None,
2266         expl_lenindef=False,
2267         lenindef=False,
2268         ber_encoded=False,
2269         bered=False,
2270 ):
2271     return PP(
2272         obj,
2273         asn1_type_name,
2274         obj_name,
2275         decode_path,
2276         value,
2277         blob,
2278         optional,
2279         default,
2280         impl,
2281         expl,
2282         offset,
2283         tlen,
2284         llen,
2285         vlen,
2286         expl_offset,
2287         expl_tlen,
2288         expl_llen,
2289         expl_vlen,
2290         expl_lenindef,
2291         lenindef,
2292         ber_encoded,
2293         bered,
2294     )
2295
2296
2297 def _colourize(what, colour, with_colours, attrs=("bold",)):
2298     return colored(what, colour, attrs=attrs) if with_colours else what
2299
2300
2301 def colonize_hex(hexed):
2302     """Separate hexadecimal string with colons
2303     """
2304     return ":".join(hexed[i:i + 2] for i in range(0, len(hexed), 2))
2305
2306
2307 def find_oid_name(asn1_type_name, oid_maps, value):
2308     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2309         for oid_map in oid_maps:
2310             oid_name = oid_map.get(value)
2311             if oid_name is not None:
2312                 return oid_name
2313     return None
2314
2315
2316 def pp_console_row(
2317         pp,
2318         oid_maps=(),
2319         with_offsets=False,
2320         with_blob=True,
2321         with_colours=False,
2322         with_decode_path=False,
2323         decode_path_len_decrease=0,
2324 ):
2325     cols = []
2326     if with_offsets:
2327         col = "%5d%s%s" % (
2328             pp.offset,
2329             (
2330                 "  " if pp.expl_offset is None else
2331                 ("-%d" % (pp.offset - pp.expl_offset))
2332             ),
2333             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2334         )
2335         col = _colourize(col, "red", with_colours, ())
2336         col += _colourize("B", "red", with_colours) if pp.bered else " "
2337         cols.append(col)
2338         col = "[%d,%d,%4d]%s" % (
2339             pp.tlen, pp.llen, pp.vlen,
2340             LENINDEF_PP_CHAR if pp.lenindef else " "
2341         )
2342         col = _colourize(col, "green", with_colours, ())
2343         cols.append(col)
2344     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2345     if decode_path_len > 0:
2346         cols.append(" ." * decode_path_len)
2347         ent = pp.decode_path[-1]
2348         if isinstance(ent, DecodePathDefBy):
2349             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2350             value = str(ent.defined_by)
2351             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2352             if oid_name is None:
2353                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2354             else:
2355                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2356         else:
2357             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2358     if pp.expl is not None:
2359         klass, _, num = pp.expl
2360         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2361         cols.append(_colourize(col, "blue", with_colours))
2362     if pp.impl is not None:
2363         klass, _, num = pp.impl
2364         col = "[%s%d]" % (TagClassReprs[klass], num)
2365         cols.append(_colourize(col, "blue", with_colours))
2366     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2367         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2368     if pp.ber_encoded:
2369         cols.append(_colourize("BER", "red", with_colours))
2370     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2371     if pp.value is not None:
2372         value = pp.value
2373         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2374         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2375         if oid_name is not None:
2376             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2377         if pp.asn1_type_name == Integer.asn1_type_name:
2378             cols.append(_colourize(
2379                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2380             ))
2381     if with_blob:
2382         if pp.blob.__class__ == bytes:
2383             cols.append(hexenc(pp.blob))
2384         elif pp.blob.__class__ == tuple:
2385             cols.append(", ".join(pp.blob))
2386     if pp.optional:
2387         cols.append(_colourize("OPTIONAL", "red", with_colours))
2388     if pp.default:
2389         cols.append(_colourize("DEFAULT", "red", with_colours))
2390     if with_decode_path:
2391         cols.append(_colourize(
2392             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2393             "grey",
2394             with_colours,
2395         ))
2396     return " ".join(cols)
2397
2398
2399 def pp_console_blob(pp, decode_path_len_decrease=0):
2400     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2401     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2402     if decode_path_len > 0:
2403         cols.append(" ." * (decode_path_len + 1))
2404     if pp.blob.__class__ == bytes:
2405         blob = hexenc(pp.blob).upper()
2406         for i in range(0, len(blob), 32):
2407             chunk = blob[i:i + 32]
2408             yield " ".join(cols + [colonize_hex(chunk)])
2409     elif pp.blob.__class__ == tuple:
2410         yield " ".join(cols + [", ".join(pp.blob)])
2411
2412
2413 def pprint(
2414         obj,
2415         oid_maps=(),
2416         big_blobs=False,
2417         with_colours=False,
2418         with_decode_path=False,
2419         decode_path_only=(),
2420         decode_path=(),
2421 ):
2422     """Pretty print object
2423
2424     :param Obj obj: object you want to pretty print
2425     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2426                      Its human readable form is printed when OID is met
2427     :param big_blobs: if large binary objects are met (like OctetString
2428                       values), do we need to print them too, on separate
2429                       lines
2430     :param with_colours: colourize output, if ``termcolor`` library
2431                          is available
2432     :param with_decode_path: print decode path
2433     :param decode_path_only: print only that specified decode path
2434     """
2435     def _pprint_pps(pps):
2436         for pp in pps:
2437             if hasattr(pp, "_fields"):
2438                 if (
2439                         decode_path_only != () and
2440                         tuple(
2441                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2442                         ) != decode_path_only
2443                 ):
2444                     continue
2445                 if big_blobs:
2446                     yield pp_console_row(
2447                         pp,
2448                         oid_maps=oid_maps,
2449                         with_offsets=True,
2450                         with_blob=False,
2451                         with_colours=with_colours,
2452                         with_decode_path=with_decode_path,
2453                         decode_path_len_decrease=len(decode_path_only),
2454                     )
2455                     for row in pp_console_blob(
2456                             pp,
2457                             decode_path_len_decrease=len(decode_path_only),
2458                     ):
2459                         yield row
2460                 else:
2461                     yield pp_console_row(
2462                         pp,
2463                         oid_maps=oid_maps,
2464                         with_offsets=True,
2465                         with_blob=True,
2466                         with_colours=with_colours,
2467                         with_decode_path=with_decode_path,
2468                         decode_path_len_decrease=len(decode_path_only),
2469                     )
2470             else:
2471                 for row in _pprint_pps(pp):
2472                     yield row
2473     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2474
2475
2476 ########################################################################
2477 # ASN.1 primitive types
2478 ########################################################################
2479
2480 BooleanState = namedtuple(
2481     "BooleanState",
2482     BasicState._fields + ("value",),
2483     **NAMEDTUPLE_KWARGS
2484 )
2485
2486
2487 class Boolean(Obj):
2488     """``BOOLEAN`` boolean type
2489
2490     >>> b = Boolean(True)
2491     BOOLEAN True
2492     >>> b == Boolean(True)
2493     True
2494     >>> bool(b)
2495     True
2496     """
2497     __slots__ = ()
2498     tag_default = tag_encode(1)
2499     asn1_type_name = "BOOLEAN"
2500
2501     def __init__(
2502             self,
2503             value=None,
2504             impl=None,
2505             expl=None,
2506             default=None,
2507             optional=False,
2508             _decoded=(0, 0, 0),
2509     ):
2510         """
2511         :param value: set the value. Either boolean type, or
2512                       :py:class:`pyderasn.Boolean` object
2513         :param bytes impl: override default tag with ``IMPLICIT`` one
2514         :param bytes expl: override default tag with ``EXPLICIT`` one
2515         :param default: set default value. Type same as in ``value``
2516         :param bool optional: is object ``OPTIONAL`` in sequence
2517         """
2518         super().__init__(impl, expl, default, optional, _decoded)
2519         self._value = None if value is None else self._value_sanitize(value)
2520         if default is not None:
2521             default = self._value_sanitize(default)
2522             self.default = self.__class__(
2523                 value=default,
2524                 impl=self.tag,
2525                 expl=self._expl,
2526             )
2527             if value is None:
2528                 self._value = default
2529
2530     def _value_sanitize(self, value):
2531         if value.__class__ == bool:
2532             return value
2533         if issubclass(value.__class__, Boolean):
2534             return value._value
2535         raise InvalidValueType((self.__class__, bool))
2536
2537     @property
2538     def ready(self):
2539         return self._value is not None
2540
2541     def __getstate__(self):
2542         return BooleanState(
2543             __version__,
2544             self.tag,
2545             self._tag_order,
2546             self._expl,
2547             self.default,
2548             self.optional,
2549             self.offset,
2550             self.llen,
2551             self.vlen,
2552             self.expl_lenindef,
2553             self.lenindef,
2554             self.ber_encoded,
2555             self._value,
2556         )
2557
2558     def __setstate__(self, state):
2559         super().__setstate__(state)
2560         self._value = state.value
2561
2562     def __nonzero__(self):
2563         self._assert_ready()
2564         return self._value
2565
2566     def __bool__(self):
2567         self._assert_ready()
2568         return self._value
2569
2570     def __eq__(self, their):
2571         if their.__class__ == bool:
2572             return self._value == their
2573         if not issubclass(their.__class__, Boolean):
2574             return False
2575         return (
2576             self._value == their._value and
2577             self.tag == their.tag and
2578             self._expl == their._expl
2579         )
2580
2581     def __call__(
2582             self,
2583             value=None,
2584             impl=None,
2585             expl=None,
2586             default=None,
2587             optional=None,
2588     ):
2589         return self.__class__(
2590             value=value,
2591             impl=self.tag if impl is None else impl,
2592             expl=self._expl if expl is None else expl,
2593             default=self.default if default is None else default,
2594             optional=self.optional if optional is None else optional,
2595         )
2596
2597     def _encode(self):
2598         self._assert_ready()
2599         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2600
2601     def _encode1st(self, state):
2602         return len(self.tag) + 2, state
2603
2604     def _encode2nd(self, writer, state_iter):
2605         self._assert_ready()
2606         write_full(writer, self._encode())
2607
2608     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2609         try:
2610             t, _, lv = tag_strip(tlv)
2611         except DecodeError as err:
2612             raise err.__class__(
2613                 msg=err.msg,
2614                 klass=self.__class__,
2615                 decode_path=decode_path,
2616                 offset=offset,
2617             )
2618         if t != self.tag:
2619             raise TagMismatch(
2620                 klass=self.__class__,
2621                 decode_path=decode_path,
2622                 offset=offset,
2623             )
2624         if tag_only:
2625             yield None
2626             return
2627         try:
2628             l, _, v = len_decode(lv)
2629         except DecodeError as err:
2630             raise err.__class__(
2631                 msg=err.msg,
2632                 klass=self.__class__,
2633                 decode_path=decode_path,
2634                 offset=offset,
2635             )
2636         if l != 1:
2637             raise InvalidLength(
2638                 "Boolean's length must be equal to 1",
2639                 klass=self.__class__,
2640                 decode_path=decode_path,
2641                 offset=offset,
2642             )
2643         if l > len(v):
2644             raise NotEnoughData(
2645                 "encoded length is longer than data",
2646                 klass=self.__class__,
2647                 decode_path=decode_path,
2648                 offset=offset,
2649             )
2650         first_octet = v[0]
2651         ber_encoded = False
2652         if first_octet == 0:
2653             value = False
2654         elif first_octet == 0xFF:
2655             value = True
2656         elif ctx.get("bered", False):
2657             value = True
2658             ber_encoded = True
2659         else:
2660             raise DecodeError(
2661                 "unacceptable Boolean value",
2662                 klass=self.__class__,
2663                 decode_path=decode_path,
2664                 offset=offset,
2665             )
2666         obj = self.__class__(
2667             value=value,
2668             impl=self.tag,
2669             expl=self._expl,
2670             default=self.default,
2671             optional=self.optional,
2672             _decoded=(offset, 1, 1),
2673         )
2674         obj.ber_encoded = ber_encoded
2675         yield decode_path, obj, v[1:]
2676
2677     def __repr__(self):
2678         return pp_console_row(next(self.pps()))
2679
2680     def pps(self, decode_path=()):
2681         yield _pp(
2682             obj=self,
2683             asn1_type_name=self.asn1_type_name,
2684             obj_name=self.__class__.__name__,
2685             decode_path=decode_path,
2686             value=str(self._value) if self.ready else None,
2687             optional=self.optional,
2688             default=self == self.default,
2689             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2690             expl=None if self._expl is None else tag_decode(self._expl),
2691             offset=self.offset,
2692             tlen=self.tlen,
2693             llen=self.llen,
2694             vlen=self.vlen,
2695             expl_offset=self.expl_offset if self.expled else None,
2696             expl_tlen=self.expl_tlen if self.expled else None,
2697             expl_llen=self.expl_llen if self.expled else None,
2698             expl_vlen=self.expl_vlen if self.expled else None,
2699             expl_lenindef=self.expl_lenindef,
2700             ber_encoded=self.ber_encoded,
2701             bered=self.bered,
2702         )
2703         for pp in self.pps_lenindef(decode_path):
2704             yield pp
2705
2706
2707 IntegerState = namedtuple(
2708     "IntegerState",
2709     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2710     **NAMEDTUPLE_KWARGS
2711 )
2712
2713
2714 class Integer(Obj):
2715     """``INTEGER`` integer type
2716
2717     >>> b = Integer(-123)
2718     INTEGER -123
2719     >>> b == Integer(-123)
2720     True
2721     >>> int(b)
2722     -123
2723
2724     >>> Integer(2, bounds=(1, 3))
2725     INTEGER 2
2726     >>> Integer(5, bounds=(1, 3))
2727     Traceback (most recent call last):
2728     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2729
2730     ::
2731
2732         class Version(Integer):
2733             schema = (
2734                 ("v1", 0),
2735                 ("v2", 1),
2736                 ("v3", 2),
2737             )
2738
2739     >>> v = Version("v1")
2740     Version INTEGER v1
2741     >>> int(v)
2742     0
2743     >>> v.named
2744     'v1'
2745     >>> v.specs
2746     {'v3': 2, 'v1': 0, 'v2': 1}
2747     """
2748     __slots__ = ("specs", "_bound_min", "_bound_max")
2749     tag_default = tag_encode(2)
2750     asn1_type_name = "INTEGER"
2751
2752     def __init__(
2753             self,
2754             value=None,
2755             bounds=None,
2756             impl=None,
2757             expl=None,
2758             default=None,
2759             optional=False,
2760             _specs=None,
2761             _decoded=(0, 0, 0),
2762     ):
2763         """
2764         :param value: set the value. Either integer type, named value
2765                       (if ``schema`` is specified in the class), or
2766                       :py:class:`pyderasn.Integer` object
2767         :param bounds: set ``(MIN, MAX)`` value constraint.
2768                        (-inf, +inf) by default
2769         :param bytes impl: override default tag with ``IMPLICIT`` one
2770         :param bytes expl: override default tag with ``EXPLICIT`` one
2771         :param default: set default value. Type same as in ``value``
2772         :param bool optional: is object ``OPTIONAL`` in sequence
2773         """
2774         super().__init__(impl, expl, default, optional, _decoded)
2775         self._value = value
2776         specs = getattr(self, "schema", {}) if _specs is None else _specs
2777         self.specs = specs if specs.__class__ == dict else dict(specs)
2778         self._bound_min, self._bound_max = getattr(
2779             self,
2780             "bounds",
2781             (float("-inf"), float("+inf")),
2782         ) if bounds is None else bounds
2783         if value is not None:
2784             self._value = self._value_sanitize(value)
2785         if default is not None:
2786             default = self._value_sanitize(default)
2787             self.default = self.__class__(
2788                 value=default,
2789                 impl=self.tag,
2790                 expl=self._expl,
2791                 _specs=self.specs,
2792             )
2793             if self._value is None:
2794                 self._value = default
2795
2796     def _value_sanitize(self, value):
2797         if isinstance(value, int):
2798             pass
2799         elif issubclass(value.__class__, Integer):
2800             value = value._value
2801         elif value.__class__ == str:
2802             value = self.specs.get(value)
2803             if value is None:
2804                 raise ObjUnknown("integer value: %s" % value)
2805         else:
2806             raise InvalidValueType((self.__class__, int, str))
2807         if not self._bound_min <= value <= self._bound_max:
2808             raise BoundsError(self._bound_min, value, self._bound_max)
2809         return value
2810
2811     @property
2812     def ready(self):
2813         return self._value is not None
2814
2815     def __getstate__(self):
2816         return IntegerState(
2817             __version__,
2818             self.tag,
2819             self._tag_order,
2820             self._expl,
2821             self.default,
2822             self.optional,
2823             self.offset,
2824             self.llen,
2825             self.vlen,
2826             self.expl_lenindef,
2827             self.lenindef,
2828             self.ber_encoded,
2829             self.specs,
2830             self._value,
2831             self._bound_min,
2832             self._bound_max,
2833         )
2834
2835     def __setstate__(self, state):
2836         super().__setstate__(state)
2837         self.specs = state.specs
2838         self._value = state.value
2839         self._bound_min = state.bound_min
2840         self._bound_max = state.bound_max
2841
2842     def __int__(self):
2843         self._assert_ready()
2844         return int(self._value)
2845
2846     def tohex(self):
2847         """Hexadecimal representation
2848
2849         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2850         """
2851         hex_repr = hex(int(self))[2:].upper()
2852         if len(hex_repr) % 2 != 0:
2853             hex_repr = "0" + hex_repr
2854         return hex_repr
2855
2856     def __hash__(self):
2857         self._assert_ready()
2858         return hash(b"".join((
2859             self.tag,
2860             bytes(self._expl or b""),
2861             str(self._value).encode("ascii"),
2862         )))
2863
2864     def __eq__(self, their):
2865         if isinstance(their, int):
2866             return self._value == their
2867         if not issubclass(their.__class__, Integer):
2868             return False
2869         return (
2870             self._value == their._value and
2871             self.tag == their.tag and
2872             self._expl == their._expl
2873         )
2874
2875     def __lt__(self, their):
2876         return self._value < their._value
2877
2878     @property
2879     def named(self):
2880         """Return named representation (if exists) of the value
2881         """
2882         for name, value in self.specs.items():
2883             if value == self._value:
2884                 return name
2885         return None
2886
2887     def __call__(
2888             self,
2889             value=None,
2890             bounds=None,
2891             impl=None,
2892             expl=None,
2893             default=None,
2894             optional=None,
2895     ):
2896         return self.__class__(
2897             value=value,
2898             bounds=(
2899                 (self._bound_min, self._bound_max)
2900                 if bounds is None else bounds
2901             ),
2902             impl=self.tag if impl is None else impl,
2903             expl=self._expl if expl is None else expl,
2904             default=self.default if default is None else default,
2905             optional=self.optional if optional is None else optional,
2906             _specs=self.specs,
2907         )
2908
2909     def _encode_payload(self):
2910         self._assert_ready()
2911         value = self._value
2912         bytes_len = ceil(value.bit_length() / 8) or 1
2913         while True:
2914             try:
2915                 octets = value.to_bytes(bytes_len, byteorder="big", signed=True)
2916             except OverflowError:
2917                 bytes_len += 1
2918             else:
2919                 break
2920         return octets
2921
2922     def _encode(self):
2923         octets = self._encode_payload()
2924         return b"".join((self.tag, len_encode(len(octets)), octets))
2925
2926     def _encode1st(self, state):
2927         l = len(self._encode_payload())
2928         return len(self.tag) + len_size(l) + l, state
2929
2930     def _encode2nd(self, writer, state_iter):
2931         write_full(writer, self._encode())
2932
2933     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2934         try:
2935             t, _, lv = tag_strip(tlv)
2936         except DecodeError as err:
2937             raise err.__class__(
2938                 msg=err.msg,
2939                 klass=self.__class__,
2940                 decode_path=decode_path,
2941                 offset=offset,
2942             )
2943         if t != self.tag:
2944             raise TagMismatch(
2945                 klass=self.__class__,
2946                 decode_path=decode_path,
2947                 offset=offset,
2948             )
2949         if tag_only:
2950             yield None
2951             return
2952         try:
2953             l, llen, v = len_decode(lv)
2954         except DecodeError as err:
2955             raise err.__class__(
2956                 msg=err.msg,
2957                 klass=self.__class__,
2958                 decode_path=decode_path,
2959                 offset=offset,
2960             )
2961         if l > len(v):
2962             raise NotEnoughData(
2963                 "encoded length is longer than data",
2964                 klass=self.__class__,
2965                 decode_path=decode_path,
2966                 offset=offset,
2967             )
2968         if l == 0:
2969             raise NotEnoughData(
2970                 "zero length",
2971                 klass=self.__class__,
2972                 decode_path=decode_path,
2973                 offset=offset,
2974             )
2975         v, tail = v[:l], v[l:]
2976         first_octet = v[0]
2977         if l > 1:
2978             second_octet = v[1]
2979             if (
2980                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2981                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2982             ):
2983                 raise DecodeError(
2984                     "non normalized integer",
2985                     klass=self.__class__,
2986                     decode_path=decode_path,
2987                     offset=offset,
2988                 )
2989         value = int.from_bytes(v, byteorder="big", signed=True)
2990         try:
2991             obj = self.__class__(
2992                 value=value,
2993                 bounds=(self._bound_min, self._bound_max),
2994                 impl=self.tag,
2995                 expl=self._expl,
2996                 default=self.default,
2997                 optional=self.optional,
2998                 _specs=self.specs,
2999                 _decoded=(offset, llen, l),
3000             )
3001         except BoundsError as err:
3002             raise DecodeError(
3003                 msg=str(err),
3004                 klass=self.__class__,
3005                 decode_path=decode_path,
3006                 offset=offset,
3007             )
3008         yield decode_path, obj, tail
3009
3010     def __repr__(self):
3011         return pp_console_row(next(self.pps()))
3012
3013     def pps(self, decode_path=()):
3014         yield _pp(
3015             obj=self,
3016             asn1_type_name=self.asn1_type_name,
3017             obj_name=self.__class__.__name__,
3018             decode_path=decode_path,
3019             value=(self.named or str(self._value)) if self.ready else None,
3020             optional=self.optional,
3021             default=self == self.default,
3022             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3023             expl=None if self._expl is None else tag_decode(self._expl),
3024             offset=self.offset,
3025             tlen=self.tlen,
3026             llen=self.llen,
3027             vlen=self.vlen,
3028             expl_offset=self.expl_offset if self.expled else None,
3029             expl_tlen=self.expl_tlen if self.expled else None,
3030             expl_llen=self.expl_llen if self.expled else None,
3031             expl_vlen=self.expl_vlen if self.expled else None,
3032             expl_lenindef=self.expl_lenindef,
3033             bered=self.bered,
3034         )
3035         for pp in self.pps_lenindef(decode_path):
3036             yield pp
3037
3038
3039 BitStringState = namedtuple(
3040     "BitStringState",
3041     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3042     **NAMEDTUPLE_KWARGS
3043 )
3044
3045
3046 class BitString(Obj):
3047     """``BIT STRING`` bit string type
3048
3049     >>> BitString(b"hello world")
3050     BIT STRING 88 bits 68656c6c6f20776f726c64
3051     >>> bytes(b)
3052     b'hello world'
3053     >>> b == b"hello world"
3054     True
3055     >>> b.bit_len
3056     88
3057
3058     >>> BitString("'0A3B5F291CD'H")
3059     BIT STRING 44 bits 0a3b5f291cd0
3060     >>> b = BitString("'010110000000'B")
3061     BIT STRING 12 bits 5800
3062     >>> b.bit_len
3063     12
3064     >>> b[0], b[1], b[2], b[3]
3065     (False, True, False, True)
3066     >>> b[1000]
3067     False
3068     >>> [v for v in b]
3069     [False, True, False, True, True, False, False, False, False, False, False, False]
3070
3071     ::
3072
3073         class KeyUsage(BitString):
3074             schema = (
3075                 ("digitalSignature", 0),
3076                 ("nonRepudiation", 1),
3077                 ("keyEncipherment", 2),
3078             )
3079
3080     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3081     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3082     >>> b.named
3083     ['nonRepudiation', 'keyEncipherment']
3084     >>> b.specs
3085     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3086
3087     .. note::
3088
3089        Pay attention that BIT STRING can be encoded both in primitive
3090        and constructed forms. Decoder always checks constructed form tag
3091        additionally to specified primitive one. If BER decoding is
3092        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3093        of DER restrictions.
3094     """
3095     __slots__ = ("tag_constructed", "specs", "defined")
3096     tag_default = tag_encode(3)
3097     asn1_type_name = "BIT STRING"
3098
3099     def __init__(
3100             self,
3101             value=None,
3102             impl=None,
3103             expl=None,
3104             default=None,
3105             optional=False,
3106             _specs=None,
3107             _decoded=(0, 0, 0),
3108     ):
3109         """
3110         :param value: set the value. Either binary type, tuple of named
3111                       values (if ``schema`` is specified in the class),
3112                       string in ``'XXX...'B`` form, or
3113                       :py:class:`pyderasn.BitString` object
3114         :param bytes impl: override default tag with ``IMPLICIT`` one
3115         :param bytes expl: override default tag with ``EXPLICIT`` one
3116         :param default: set default value. Type same as in ``value``
3117         :param bool optional: is object ``OPTIONAL`` in sequence
3118         """
3119         super().__init__(impl, expl, default, optional, _decoded)
3120         specs = getattr(self, "schema", {}) if _specs is None else _specs
3121         self.specs = specs if specs.__class__ == dict else dict(specs)
3122         self._value = None if value is None else self._value_sanitize(value)
3123         if default is not None:
3124             default = self._value_sanitize(default)
3125             self.default = self.__class__(
3126                 value=default,
3127                 impl=self.tag,
3128                 expl=self._expl,
3129             )
3130             if value is None:
3131                 self._value = default
3132         self.defined = None
3133         tag_klass, _, tag_num = tag_decode(self.tag)
3134         self.tag_constructed = tag_encode(
3135             klass=tag_klass,
3136             form=TagFormConstructed,
3137             num=tag_num,
3138         )
3139
3140     def _bits2octets(self, bits):
3141         if len(self.specs) > 0:
3142             bits = bits.rstrip("0")
3143         bit_len = len(bits)
3144         bits += "0" * ((8 - (bit_len % 8)) % 8)
3145         octets = bytearray(len(bits) // 8)
3146         for i in range(len(octets)):
3147             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3148         return bit_len, bytes(octets)
3149
3150     def _value_sanitize(self, value):
3151         if isinstance(value, (str, bytes)):
3152             if (
3153                     isinstance(value, str) and
3154                     value.startswith("'")
3155             ):
3156                 if value.endswith("'B"):
3157                     value = value[1:-2]
3158                     if not frozenset(value) <= SET01:
3159                         raise ValueError("B's coding contains unacceptable chars")
3160                     return self._bits2octets(value)
3161                 if value.endswith("'H"):
3162                     value = value[1:-2]
3163                     return (
3164                         len(value) * 4,
3165                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3166                     )
3167             if value.__class__ == bytes:
3168                 return (len(value) * 8, value)
3169             raise InvalidValueType((self.__class__, str, bytes))
3170         if value.__class__ == tuple:
3171             if (
3172                     len(value) == 2 and
3173                     isinstance(value[0], int) and
3174                     value[1].__class__ == bytes
3175             ):
3176                 return value
3177             bits = []
3178             for name in value:
3179                 bit = self.specs.get(name)
3180                 if bit is None:
3181                     raise ObjUnknown("BitString value: %s" % name)
3182                 bits.append(bit)
3183             if len(bits) == 0:
3184                 return self._bits2octets("")
3185             bits = frozenset(bits)
3186             return self._bits2octets("".join(
3187                 ("1" if bit in bits else "0")
3188                 for bit in range(max(bits) + 1)
3189             ))
3190         if issubclass(value.__class__, BitString):
3191             return value._value
3192         raise InvalidValueType((self.__class__, bytes, str))
3193
3194     @property
3195     def ready(self):
3196         return self._value is not None
3197
3198     def __getstate__(self):
3199         return BitStringState(
3200             __version__,
3201             self.tag,
3202             self._tag_order,
3203             self._expl,
3204             self.default,
3205             self.optional,
3206             self.offset,
3207             self.llen,
3208             self.vlen,
3209             self.expl_lenindef,
3210             self.lenindef,
3211             self.ber_encoded,
3212             self.specs,
3213             self._value,
3214             self.tag_constructed,
3215             self.defined,
3216         )
3217
3218     def __setstate__(self, state):
3219         super().__setstate__(state)
3220         self.specs = state.specs
3221         self._value = state.value
3222         self.tag_constructed = state.tag_constructed
3223         self.defined = state.defined
3224
3225     def __iter__(self):
3226         self._assert_ready()
3227         for i in range(self._value[0]):
3228             yield self[i]
3229
3230     @property
3231     def bit_len(self):
3232         """Returns number of bits in the string
3233         """
3234         self._assert_ready()
3235         return self._value[0]
3236
3237     def __bytes__(self):
3238         self._assert_ready()
3239         return self._value[1]
3240
3241     def __eq__(self, their):
3242         if their.__class__ == bytes:
3243             return self._value[1] == their
3244         if not issubclass(their.__class__, BitString):
3245             return False
3246         return (
3247             self._value == their._value and
3248             self.tag == their.tag and
3249             self._expl == their._expl
3250         )
3251
3252     @property
3253     def named(self):
3254         """Named representation (if exists) of the bits
3255
3256         :returns: [str(name), ...]
3257         """
3258         return [name for name, bit in self.specs.items() if self[bit]]
3259
3260     def __call__(
3261             self,
3262             value=None,
3263             impl=None,
3264             expl=None,
3265             default=None,
3266             optional=None,
3267     ):
3268         return self.__class__(
3269             value=value,
3270             impl=self.tag if impl is None else impl,
3271             expl=self._expl if expl is None else expl,
3272             default=self.default if default is None else default,
3273             optional=self.optional if optional is None else optional,
3274             _specs=self.specs,
3275         )
3276
3277     def __getitem__(self, key):
3278         if key.__class__ == int:
3279             bit_len, octets = self._value
3280             if key >= bit_len:
3281                 return False
3282             return memoryview(octets)[key // 8] >> (7 - (key % 8)) & 1 == 1
3283         if isinstance(key, str):
3284             value = self.specs.get(key)
3285             if value is None:
3286                 raise ObjUnknown("BitString value: %s" % key)
3287             return self[value]
3288         raise InvalidValueType((int, str))
3289
3290     def _encode(self):
3291         self._assert_ready()
3292         bit_len, octets = self._value
3293         return b"".join((
3294             self.tag,
3295             len_encode(len(octets) + 1),
3296             int2byte((8 - bit_len % 8) % 8),
3297             octets,
3298         ))
3299
3300     def _encode1st(self, state):
3301         self._assert_ready()
3302         _, octets = self._value
3303         l = len(octets) + 1
3304         return len(self.tag) + len_size(l) + l, state
3305
3306     def _encode2nd(self, writer, state_iter):
3307         bit_len, octets = self._value
3308         write_full(writer, b"".join((
3309             self.tag,
3310             len_encode(len(octets) + 1),
3311             int2byte((8 - bit_len % 8) % 8),
3312         )))
3313         write_full(writer, octets)
3314
3315     def _encode_cer(self, writer):
3316         bit_len, octets = self._value
3317         if len(octets) + 1 <= 1000:
3318             write_full(writer, self._encode())
3319             return
3320         write_full(writer, self.tag_constructed)
3321         write_full(writer, LENINDEF)
3322         for offset in range(0, (len(octets) // 999) * 999, 999):
3323             write_full(writer, b"".join((
3324                 BitString.tag_default,
3325                 LEN1K,
3326                 b"\x00",
3327                 octets[offset:offset + 999],
3328             )))
3329         tail = octets[offset + 999:]
3330         if len(tail) > 0:
3331             tail = int2byte((8 - bit_len % 8) % 8) + tail
3332             write_full(writer, b"".join((
3333                 BitString.tag_default,
3334                 len_encode(len(tail)),
3335                 tail,
3336             )))
3337         write_full(writer, EOC)
3338
3339     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3340         try:
3341             t, tlen, lv = tag_strip(tlv)
3342         except DecodeError as err:
3343             raise err.__class__(
3344                 msg=err.msg,
3345                 klass=self.__class__,
3346                 decode_path=decode_path,
3347                 offset=offset,
3348             )
3349         if t == self.tag:
3350             if tag_only:  # pragma: no cover
3351                 yield None
3352                 return
3353             try:
3354                 l, llen, v = len_decode(lv)
3355             except DecodeError as err:
3356                 raise err.__class__(
3357                     msg=err.msg,
3358                     klass=self.__class__,
3359                     decode_path=decode_path,
3360                     offset=offset,
3361                 )
3362             if l > len(v):
3363                 raise NotEnoughData(
3364                     "encoded length is longer than data",
3365                     klass=self.__class__,
3366                     decode_path=decode_path,
3367                     offset=offset,
3368                 )
3369             if l == 0:
3370                 raise NotEnoughData(
3371                     "zero length",
3372                     klass=self.__class__,
3373                     decode_path=decode_path,
3374                     offset=offset,
3375                 )
3376             pad_size = v[0]
3377             if l == 1 and pad_size != 0:
3378                 raise DecodeError(
3379                     "invalid empty value",
3380                     klass=self.__class__,
3381                     decode_path=decode_path,
3382                     offset=offset,
3383                 )
3384             if pad_size > 7:
3385                 raise DecodeError(
3386                     "too big pad",
3387                     klass=self.__class__,
3388                     decode_path=decode_path,
3389                     offset=offset,
3390                 )
3391             if v[l - 1] & ((1 << pad_size) - 1) != 0:
3392                 raise DecodeError(
3393                     "invalid pad",
3394                     klass=self.__class__,
3395                     decode_path=decode_path,
3396                     offset=offset,
3397                 )
3398             v, tail = v[:l], v[l:]
3399             bit_len = (len(v) - 1) * 8 - pad_size
3400             obj = self.__class__(
3401                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3402                 impl=self.tag,
3403                 expl=self._expl,
3404                 default=self.default,
3405                 optional=self.optional,
3406                 _specs=self.specs,
3407                 _decoded=(offset, llen, l),
3408             )
3409             if evgen_mode:
3410                 obj._value = (bit_len, None)
3411             yield decode_path, obj, tail
3412             return
3413         if t != self.tag_constructed:
3414             raise TagMismatch(
3415                 klass=self.__class__,
3416                 decode_path=decode_path,
3417                 offset=offset,
3418             )
3419         if not ctx.get("bered", False):
3420             raise DecodeError(
3421                 "unallowed BER constructed encoding",
3422                 klass=self.__class__,
3423                 decode_path=decode_path,
3424                 offset=offset,
3425             )
3426         if tag_only:  # pragma: no cover
3427             yield None
3428             return
3429         lenindef = False
3430         try:
3431             l, llen, v = len_decode(lv)
3432         except LenIndefForm:
3433             llen, l, v = 1, 0, lv[1:]
3434             lenindef = True
3435         except DecodeError as err:
3436             raise err.__class__(
3437                 msg=err.msg,
3438                 klass=self.__class__,
3439                 decode_path=decode_path,
3440                 offset=offset,
3441             )
3442         if l > len(v):
3443             raise NotEnoughData(
3444                 "encoded length is longer than data",
3445                 klass=self.__class__,
3446                 decode_path=decode_path,
3447                 offset=offset,
3448             )
3449         if not lenindef and l == 0:
3450             raise NotEnoughData(
3451                 "zero length",
3452                 klass=self.__class__,
3453                 decode_path=decode_path,
3454                 offset=offset,
3455             )
3456         chunks = []
3457         sub_offset = offset + tlen + llen
3458         vlen = 0
3459         while True:
3460             if lenindef:
3461                 if v[:EOC_LEN].tobytes() == EOC:
3462                     break
3463             else:
3464                 if vlen == l:
3465                     break
3466                 if vlen > l:
3467                     raise DecodeError(
3468                         "chunk out of bounds",
3469                         klass=self.__class__,
3470                         decode_path=decode_path + (str(len(chunks) - 1),),
3471                         offset=chunks[-1].offset,
3472                     )
3473             sub_decode_path = decode_path + (str(len(chunks)),)
3474             try:
3475                 if evgen_mode:
3476                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3477                             v,
3478                             offset=sub_offset,
3479                             decode_path=sub_decode_path,
3480                             leavemm=True,
3481                             ctx=ctx,
3482                             _ctx_immutable=False,
3483                     ):
3484                         yield _decode_path, chunk, v_tail
3485                 else:
3486                     _, chunk, v_tail = next(BitString().decode_evgen(
3487                         v,
3488                         offset=sub_offset,
3489                         decode_path=sub_decode_path,
3490                         leavemm=True,
3491                         ctx=ctx,
3492                         _ctx_immutable=False,
3493                         _evgen_mode=False,
3494                     ))
3495             except TagMismatch:
3496                 raise DecodeError(
3497                     "expected BitString encoded chunk",
3498                     klass=self.__class__,
3499                     decode_path=sub_decode_path,
3500                     offset=sub_offset,
3501                 )
3502             chunks.append(chunk)
3503             sub_offset += chunk.tlvlen
3504             vlen += chunk.tlvlen
3505             v = v_tail
3506         if len(chunks) == 0:
3507             raise DecodeError(
3508                 "no chunks",
3509                 klass=self.__class__,
3510                 decode_path=decode_path,
3511                 offset=offset,
3512             )
3513         values = []
3514         bit_len = 0
3515         for chunk_i, chunk in enumerate(chunks[:-1]):
3516             if chunk.bit_len % 8 != 0:
3517                 raise DecodeError(
3518                     "BitString chunk is not multiple of 8 bits",
3519                     klass=self.__class__,
3520                     decode_path=decode_path + (str(chunk_i),),
3521                     offset=chunk.offset,
3522                 )
3523             if not evgen_mode:
3524                 values.append(bytes(chunk))
3525             bit_len += chunk.bit_len
3526         chunk_last = chunks[-1]
3527         if not evgen_mode:
3528             values.append(bytes(chunk_last))
3529         bit_len += chunk_last.bit_len
3530         obj = self.__class__(
3531             value=None if evgen_mode else (bit_len, b"".join(values)),
3532             impl=self.tag,
3533             expl=self._expl,
3534             default=self.default,
3535             optional=self.optional,
3536             _specs=self.specs,
3537             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3538         )
3539         if evgen_mode:
3540             obj._value = (bit_len, None)
3541         obj.lenindef = lenindef
3542         obj.ber_encoded = True
3543         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3544
3545     def __repr__(self):
3546         return pp_console_row(next(self.pps()))
3547
3548     def pps(self, decode_path=()):
3549         value = None
3550         blob = None
3551         if self.ready:
3552             bit_len, blob = self._value
3553             value = "%d bits" % bit_len
3554             if len(self.specs) > 0 and blob is not None:
3555                 blob = tuple(self.named)
3556         yield _pp(
3557             obj=self,
3558             asn1_type_name=self.asn1_type_name,
3559             obj_name=self.__class__.__name__,
3560             decode_path=decode_path,
3561             value=value,
3562             blob=blob,
3563             optional=self.optional,
3564             default=self == self.default,
3565             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3566             expl=None if self._expl is None else tag_decode(self._expl),
3567             offset=self.offset,
3568             tlen=self.tlen,
3569             llen=self.llen,
3570             vlen=self.vlen,
3571             expl_offset=self.expl_offset if self.expled else None,
3572             expl_tlen=self.expl_tlen if self.expled else None,
3573             expl_llen=self.expl_llen if self.expled else None,
3574             expl_vlen=self.expl_vlen if self.expled else None,
3575             expl_lenindef=self.expl_lenindef,
3576             lenindef=self.lenindef,
3577             ber_encoded=self.ber_encoded,
3578             bered=self.bered,
3579         )
3580         defined_by, defined = self.defined or (None, None)
3581         if defined_by is not None:
3582             yield defined.pps(
3583                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3584             )
3585         for pp in self.pps_lenindef(decode_path):
3586             yield pp
3587
3588
3589 OctetStringState = namedtuple(
3590     "OctetStringState",
3591     BasicState._fields + (
3592         "value",
3593         "bound_min",
3594         "bound_max",
3595         "tag_constructed",
3596         "defined",
3597     ),
3598     **NAMEDTUPLE_KWARGS
3599 )
3600
3601
3602 class OctetString(Obj):
3603     """``OCTET STRING`` binary string type
3604
3605     >>> s = OctetString(b"hello world")
3606     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3607     >>> s == OctetString(b"hello world")
3608     True
3609     >>> bytes(s)
3610     b'hello world'
3611
3612     >>> OctetString(b"hello", bounds=(4, 4))
3613     Traceback (most recent call last):
3614     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3615     >>> OctetString(b"hell", bounds=(4, 4))
3616     OCTET STRING 4 bytes 68656c6c
3617
3618     Memoryviews can be used as a values. If memoryview is made on
3619     mmap-ed file, then it does not take storage inside OctetString
3620     itself. In CER encoding mode it will be streamed to the specified
3621     writer, copying 1 KB chunks.
3622     """
3623     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3624     tag_default = tag_encode(4)
3625     asn1_type_name = "OCTET STRING"
3626     evgen_mode_skip_value = True
3627
3628     def __init__(
3629             self,
3630             value=None,
3631             bounds=None,
3632             impl=None,
3633             expl=None,
3634             default=None,
3635             optional=False,
3636             _decoded=(0, 0, 0),
3637             ctx=None,
3638     ):
3639         """
3640         :param value: set the value. Either binary type, or
3641                       :py:class:`pyderasn.OctetString` object
3642         :param bounds: set ``(MIN, MAX)`` value size constraint.
3643                        (-inf, +inf) by default
3644         :param bytes impl: override default tag with ``IMPLICIT`` one
3645         :param bytes expl: override default tag with ``EXPLICIT`` one
3646         :param default: set default value. Type same as in ``value``
3647         :param bool optional: is object ``OPTIONAL`` in sequence
3648         """
3649         super().__init__(impl, expl, default, optional, _decoded)
3650         self._value = value
3651         self._bound_min, self._bound_max = getattr(
3652             self,
3653             "bounds",
3654             (0, float("+inf")),
3655         ) if bounds is None else bounds
3656         if value is not None:
3657             self._value = self._value_sanitize(value)
3658         if default is not None:
3659             default = self._value_sanitize(default)
3660             self.default = self.__class__(
3661                 value=default,
3662                 impl=self.tag,
3663                 expl=self._expl,
3664             )
3665             if self._value is None:
3666                 self._value = default
3667         self.defined = None
3668         tag_klass, _, tag_num = tag_decode(self.tag)
3669         self.tag_constructed = tag_encode(
3670             klass=tag_klass,
3671             form=TagFormConstructed,
3672             num=tag_num,
3673         )
3674
3675     def _value_sanitize(self, value):
3676         if value.__class__ == bytes or value.__class__ == memoryview:
3677             pass
3678         elif issubclass(value.__class__, OctetString):
3679             value = value._value
3680         else:
3681             raise InvalidValueType((self.__class__, bytes, memoryview))
3682         if not self._bound_min <= len(value) <= self._bound_max:
3683             raise BoundsError(self._bound_min, len(value), self._bound_max)
3684         return value
3685
3686     @property
3687     def ready(self):
3688         return self._value is not None
3689
3690     def __getstate__(self):
3691         return OctetStringState(
3692             __version__,
3693             self.tag,
3694             self._tag_order,
3695             self._expl,
3696             self.default,
3697             self.optional,
3698             self.offset,
3699             self.llen,
3700             self.vlen,
3701             self.expl_lenindef,
3702             self.lenindef,
3703             self.ber_encoded,
3704             self._value,
3705             self._bound_min,
3706             self._bound_max,
3707             self.tag_constructed,
3708             self.defined,
3709         )
3710
3711     def __setstate__(self, state):
3712         super().__setstate__(state)
3713         self._value = state.value
3714         self._bound_min = state.bound_min
3715         self._bound_max = state.bound_max
3716         self.tag_constructed = state.tag_constructed
3717         self.defined = state.defined
3718
3719     def __bytes__(self):
3720         self._assert_ready()
3721         return bytes(self._value)
3722
3723     def __eq__(self, their):
3724         if their.__class__ == bytes:
3725             return self._value == their
3726         if not issubclass(their.__class__, OctetString):
3727             return False
3728         return (
3729             self._value == their._value and
3730             self.tag == their.tag and
3731             self._expl == their._expl
3732         )
3733
3734     def __lt__(self, their):
3735         return self._value < their._value
3736
3737     def __call__(
3738             self,
3739             value=None,
3740             bounds=None,
3741             impl=None,
3742             expl=None,
3743             default=None,
3744             optional=None,
3745     ):
3746         return self.__class__(
3747             value=value,
3748             bounds=(
3749                 (self._bound_min, self._bound_max)
3750                 if bounds is None else bounds
3751             ),
3752             impl=self.tag if impl is None else impl,
3753             expl=self._expl if expl is None else expl,
3754             default=self.default if default is None else default,
3755             optional=self.optional if optional is None else optional,
3756         )
3757
3758     def _encode(self):
3759         self._assert_ready()
3760         return b"".join((
3761             self.tag,
3762             len_encode(len(self._value)),
3763             self._value,
3764         ))
3765
3766     def _encode1st(self, state):
3767         self._assert_ready()
3768         l = len(self._value)
3769         return len(self.tag) + len_size(l) + l, state
3770
3771     def _encode2nd(self, writer, state_iter):
3772         value = self._value
3773         write_full(writer, self.tag + len_encode(len(value)))
3774         write_full(writer, value)
3775
3776     def _encode_cer(self, writer):
3777         octets = self._value
3778         if len(octets) <= 1000:
3779             write_full(writer, self._encode())
3780             return
3781         write_full(writer, self.tag_constructed)
3782         write_full(writer, LENINDEF)
3783         for offset in range(0, (len(octets) // 1000) * 1000, 1000):
3784             write_full(writer, b"".join((
3785                 OctetString.tag_default,
3786                 LEN1K,
3787                 octets[offset:offset + 1000],
3788             )))
3789         tail = octets[offset + 1000:]
3790         if len(tail) > 0:
3791             write_full(writer, b"".join((
3792                 OctetString.tag_default,
3793                 len_encode(len(tail)),
3794                 tail,
3795             )))
3796         write_full(writer, EOC)
3797
3798     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3799         try:
3800             t, tlen, lv = tag_strip(tlv)
3801         except DecodeError as err:
3802             raise err.__class__(
3803                 msg=err.msg,
3804                 klass=self.__class__,
3805                 decode_path=decode_path,
3806                 offset=offset,
3807             )
3808         if t == self.tag:
3809             if tag_only:
3810                 yield None
3811                 return
3812             try:
3813                 l, llen, v = len_decode(lv)
3814             except DecodeError as err:
3815                 raise err.__class__(
3816                     msg=err.msg,
3817                     klass=self.__class__,
3818                     decode_path=decode_path,
3819                     offset=offset,
3820                 )
3821             if l > len(v):
3822                 raise NotEnoughData(
3823                     "encoded length is longer than data",
3824                     klass=self.__class__,
3825                     decode_path=decode_path,
3826                     offset=offset,
3827                 )
3828             v, tail = v[:l], v[l:]
3829             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3830                 raise DecodeError(
3831                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3832                     klass=self.__class__,
3833                     decode_path=decode_path,
3834                     offset=offset,
3835                 )
3836             try:
3837                 obj = self.__class__(
3838                     value=(
3839                         None if (evgen_mode and self.evgen_mode_skip_value)
3840                         else v.tobytes()
3841                     ),
3842                     bounds=(self._bound_min, self._bound_max),
3843                     impl=self.tag,
3844                     expl=self._expl,
3845                     default=self.default,
3846                     optional=self.optional,
3847                     _decoded=(offset, llen, l),
3848                     ctx=ctx,
3849                 )
3850             except DecodeError as err:
3851                 raise DecodeError(
3852                     msg=err.msg,
3853                     klass=self.__class__,
3854                     decode_path=decode_path,
3855                     offset=offset,
3856                 )
3857             except BoundsError as err:
3858                 raise DecodeError(
3859                     msg=str(err),
3860                     klass=self.__class__,
3861                     decode_path=decode_path,
3862                     offset=offset,
3863                 )
3864             yield decode_path, obj, tail
3865             return
3866         if t != self.tag_constructed:
3867             raise TagMismatch(
3868                 klass=self.__class__,
3869                 decode_path=decode_path,
3870                 offset=offset,
3871             )
3872         if not ctx.get("bered", False):
3873             raise DecodeError(
3874                 "unallowed BER constructed encoding",
3875                 klass=self.__class__,
3876                 decode_path=decode_path,
3877                 offset=offset,
3878             )
3879         if tag_only:
3880             yield None
3881             return
3882         lenindef = False
3883         try:
3884             l, llen, v = len_decode(lv)
3885         except LenIndefForm:
3886             llen, l, v = 1, 0, lv[1:]
3887             lenindef = True
3888         except DecodeError as err:
3889             raise err.__class__(
3890                 msg=err.msg,
3891                 klass=self.__class__,
3892                 decode_path=decode_path,
3893                 offset=offset,
3894             )
3895         if l > len(v):
3896             raise NotEnoughData(
3897                 "encoded length is longer than data",
3898                 klass=self.__class__,
3899                 decode_path=decode_path,
3900                 offset=offset,
3901             )
3902         chunks = []
3903         chunks_count = 0
3904         sub_offset = offset + tlen + llen
3905         vlen = 0
3906         payload_len = 0
3907         while True:
3908             if lenindef:
3909                 if v[:EOC_LEN].tobytes() == EOC:
3910                     break
3911             else:
3912                 if vlen == l:
3913                     break
3914                 if vlen > l:
3915                     raise DecodeError(
3916                         "chunk out of bounds",
3917                         klass=self.__class__,
3918                         decode_path=decode_path + (str(len(chunks) - 1),),
3919                         offset=chunks[-1].offset,
3920                     )
3921             try:
3922                 if evgen_mode:
3923                     sub_decode_path = decode_path + (str(chunks_count),)
3924                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3925                             v,
3926                             offset=sub_offset,
3927                             decode_path=sub_decode_path,
3928                             leavemm=True,
3929                             ctx=ctx,
3930                             _ctx_immutable=False,
3931                     ):
3932                         yield _decode_path, chunk, v_tail
3933                         if not chunk.ber_encoded:
3934                             payload_len += chunk.vlen
3935                     chunks_count += 1
3936                 else:
3937                     sub_decode_path = decode_path + (str(len(chunks)),)
3938                     _, chunk, v_tail = next(OctetString().decode_evgen(
3939                         v,
3940                         offset=sub_offset,
3941                         decode_path=sub_decode_path,
3942                         leavemm=True,
3943                         ctx=ctx,
3944                         _ctx_immutable=False,
3945                         _evgen_mode=False,
3946                     ))
3947                     chunks.append(chunk)
3948             except TagMismatch:
3949                 raise DecodeError(
3950                     "expected OctetString encoded chunk",
3951                     klass=self.__class__,
3952                     decode_path=sub_decode_path,
3953                     offset=sub_offset,
3954                 )
3955             sub_offset += chunk.tlvlen
3956             vlen += chunk.tlvlen
3957             v = v_tail
3958         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3959             raise DecodeError(
3960                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3961                 klass=self.__class__,
3962                 decode_path=decode_path,
3963                 offset=offset,
3964             )
3965         try:
3966             obj = self.__class__(
3967                 value=(
3968                     None if evgen_mode else
3969                     b"".join(bytes(chunk) for chunk in chunks)
3970                 ),
3971                 bounds=(self._bound_min, self._bound_max),
3972                 impl=self.tag,
3973                 expl=self._expl,
3974                 default=self.default,
3975                 optional=self.optional,
3976                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3977                 ctx=ctx,
3978             )
3979         except DecodeError as err:
3980             raise DecodeError(
3981                 msg=err.msg,
3982                 klass=self.__class__,
3983                 decode_path=decode_path,
3984                 offset=offset,
3985             )
3986         except BoundsError as err:
3987             raise DecodeError(
3988                 msg=str(err),
3989                 klass=self.__class__,
3990                 decode_path=decode_path,
3991                 offset=offset,
3992             )
3993         obj.lenindef = lenindef
3994         obj.ber_encoded = True
3995         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3996
3997     def __repr__(self):
3998         return pp_console_row(next(self.pps()))
3999
4000     def pps(self, decode_path=()):
4001         yield _pp(
4002             obj=self,
4003             asn1_type_name=self.asn1_type_name,
4004             obj_name=self.__class__.__name__,
4005             decode_path=decode_path,
4006             value=("%d bytes" % len(self._value)) if self.ready else None,
4007             blob=self._value if self.ready else None,
4008             optional=self.optional,
4009             default=self == self.default,
4010             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4011             expl=None if self._expl is None else tag_decode(self._expl),
4012             offset=self.offset,
4013             tlen=self.tlen,
4014             llen=self.llen,
4015             vlen=self.vlen,
4016             expl_offset=self.expl_offset if self.expled else None,
4017             expl_tlen=self.expl_tlen if self.expled else None,
4018             expl_llen=self.expl_llen if self.expled else None,
4019             expl_vlen=self.expl_vlen if self.expled else None,
4020             expl_lenindef=self.expl_lenindef,
4021             lenindef=self.lenindef,
4022             ber_encoded=self.ber_encoded,
4023             bered=self.bered,
4024         )
4025         defined_by, defined = self.defined or (None, None)
4026         if defined_by is not None:
4027             yield defined.pps(
4028                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4029             )
4030         for pp in self.pps_lenindef(decode_path):
4031             yield pp
4032
4033
4034 def agg_octet_string(evgens, decode_path, raw, writer):
4035     """Aggregate constructed string (OctetString and its derivatives)
4036
4037     :param evgens: iterator of generated events
4038     :param decode_path: points to the string we want to decode
4039     :param raw: slicebable (memoryview, bytearray, etc) with
4040                 the data evgens are generated on
4041     :param writer: buffer.write where string is going to be saved
4042     :param writer: where string is going to be saved. Must comply
4043                    with ``io.RawIOBase.write`` behaviour
4044
4045     .. seealso:: :ref:`agg_octet_string`
4046     """
4047     decode_path_len = len(decode_path)
4048     for dp, obj, _ in evgens:
4049         if dp[:decode_path_len] != decode_path:
4050             continue
4051         if not obj.ber_encoded:
4052             write_full(writer, raw[
4053                 obj.offset + obj.tlen + obj.llen:
4054                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4055                 (EOC_LEN if obj.expl_lenindef else 0)
4056             ])
4057         if len(dp) == decode_path_len:
4058             break
4059
4060
4061 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4062
4063
4064 class Null(Obj):
4065     """``NULL`` null object
4066
4067     >>> n = Null()
4068     NULL
4069     >>> n.ready
4070     True
4071     """
4072     __slots__ = ()
4073     tag_default = tag_encode(5)
4074     asn1_type_name = "NULL"
4075
4076     def __init__(
4077             self,
4078             value=None,  # unused, but Sequence passes it
4079             impl=None,
4080             expl=None,
4081             optional=False,
4082             _decoded=(0, 0, 0),
4083     ):
4084         """
4085         :param bytes impl: override default tag with ``IMPLICIT`` one
4086         :param bytes expl: override default tag with ``EXPLICIT`` one
4087         :param bool optional: is object ``OPTIONAL`` in sequence
4088         """
4089         super().__init__(impl, expl, None, optional, _decoded)
4090         self.default = None
4091
4092     @property
4093     def ready(self):
4094         return True
4095
4096     def __getstate__(self):
4097         return NullState(
4098             __version__,
4099             self.tag,
4100             self._tag_order,
4101             self._expl,
4102             self.default,
4103             self.optional,
4104             self.offset,
4105             self.llen,
4106             self.vlen,
4107             self.expl_lenindef,
4108             self.lenindef,
4109             self.ber_encoded,
4110         )
4111
4112     def __eq__(self, their):
4113         if not issubclass(their.__class__, Null):
4114             return False
4115         return (
4116             self.tag == their.tag and
4117             self._expl == their._expl
4118         )
4119
4120     def __call__(
4121             self,
4122             value=None,
4123             impl=None,
4124             expl=None,
4125             optional=None,
4126     ):
4127         return self.__class__(
4128             impl=self.tag if impl is None else impl,
4129             expl=self._expl if expl is None else expl,
4130             optional=self.optional if optional is None else optional,
4131         )
4132
4133     def _encode(self):
4134         return self.tag + LEN0
4135
4136     def _encode1st(self, state):
4137         return len(self.tag) + 1, state
4138
4139     def _encode2nd(self, writer, state_iter):
4140         write_full(writer, self.tag + LEN0)
4141
4142     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4143         try:
4144             t, _, lv = tag_strip(tlv)
4145         except DecodeError as err:
4146             raise err.__class__(
4147                 msg=err.msg,
4148                 klass=self.__class__,
4149                 decode_path=decode_path,
4150                 offset=offset,
4151             )
4152         if t != self.tag:
4153             raise TagMismatch(
4154                 klass=self.__class__,
4155                 decode_path=decode_path,
4156                 offset=offset,
4157             )
4158         if tag_only:  # pragma: no cover
4159             yield None
4160             return
4161         try:
4162             l, _, v = len_decode(lv)
4163         except DecodeError as err:
4164             raise err.__class__(
4165                 msg=err.msg,
4166                 klass=self.__class__,
4167                 decode_path=decode_path,
4168                 offset=offset,
4169             )
4170         if l != 0:
4171             raise InvalidLength(
4172                 "Null must have zero length",
4173                 klass=self.__class__,
4174                 decode_path=decode_path,
4175                 offset=offset,
4176             )
4177         obj = self.__class__(
4178             impl=self.tag,
4179             expl=self._expl,
4180             optional=self.optional,
4181             _decoded=(offset, 1, 0),
4182         )
4183         yield decode_path, obj, v
4184
4185     def __repr__(self):
4186         return pp_console_row(next(self.pps()))
4187
4188     def pps(self, decode_path=()):
4189         yield _pp(
4190             obj=self,
4191             asn1_type_name=self.asn1_type_name,
4192             obj_name=self.__class__.__name__,
4193             decode_path=decode_path,
4194             optional=self.optional,
4195             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4196             expl=None if self._expl is None else tag_decode(self._expl),
4197             offset=self.offset,
4198             tlen=self.tlen,
4199             llen=self.llen,
4200             vlen=self.vlen,
4201             expl_offset=self.expl_offset if self.expled else None,
4202             expl_tlen=self.expl_tlen if self.expled else None,
4203             expl_llen=self.expl_llen if self.expled else None,
4204             expl_vlen=self.expl_vlen if self.expled else None,
4205             expl_lenindef=self.expl_lenindef,
4206             bered=self.bered,
4207         )
4208         for pp in self.pps_lenindef(decode_path):
4209             yield pp
4210
4211
4212 ObjectIdentifierState = namedtuple(
4213     "ObjectIdentifierState",
4214     BasicState._fields + ("value", "defines"),
4215     **NAMEDTUPLE_KWARGS
4216 )
4217
4218
4219 class ObjectIdentifier(Obj):
4220     """``OBJECT IDENTIFIER`` OID type
4221
4222     >>> oid = ObjectIdentifier((1, 2, 3))
4223     OBJECT IDENTIFIER 1.2.3
4224     >>> oid == ObjectIdentifier("1.2.3")
4225     True
4226     >>> tuple(oid)
4227     (1, 2, 3)
4228     >>> str(oid)
4229     '1.2.3'
4230     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4231     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4232
4233     >>> str(ObjectIdentifier((3, 1)))
4234     Traceback (most recent call last):
4235     pyderasn.InvalidOID: unacceptable first arc value
4236     """
4237     __slots__ = ("defines",)
4238     tag_default = tag_encode(6)
4239     asn1_type_name = "OBJECT IDENTIFIER"
4240
4241     def __init__(
4242             self,
4243             value=None,
4244             defines=(),
4245             impl=None,
4246             expl=None,
4247             default=None,
4248             optional=False,
4249             _decoded=(0, 0, 0),
4250     ):
4251         """
4252         :param value: set the value. Either tuples of integers,
4253                       string of "."-concatenated integers, or
4254                       :py:class:`pyderasn.ObjectIdentifier` object
4255         :param defines: sequence of tuples. Each tuple has two elements.
4256                         First one is relative to current one decode
4257                         path, aiming to the field defined by that OID.
4258                         Read about relative path in
4259                         :py:func:`pyderasn.abs_decode_path`. Second
4260                         tuple element is ``{OID: pyderasn.Obj()}``
4261                         dictionary, mapping between current OID value
4262                         and structure applied to defined field.
4263
4264                         .. seealso:: :ref:`definedby`
4265
4266         :param bytes impl: override default tag with ``IMPLICIT`` one
4267         :param bytes expl: override default tag with ``EXPLICIT`` one
4268         :param default: set default value. Type same as in ``value``
4269         :param bool optional: is object ``OPTIONAL`` in sequence
4270         """
4271         super().__init__(impl, expl, default, optional, _decoded)
4272         self._value = value
4273         if value is not None:
4274             self._value = self._value_sanitize(value)
4275         if default is not None:
4276             default = self._value_sanitize(default)
4277             self.default = self.__class__(
4278                 value=default,
4279                 impl=self.tag,
4280                 expl=self._expl,
4281             )
4282             if self._value is None:
4283                 self._value = default
4284         self.defines = defines
4285
4286     def __add__(self, their):
4287         if their.__class__ == tuple:
4288             return self.__class__(self._value + array("L", their))
4289         if isinstance(their, self.__class__):
4290             return self.__class__(self._value + their._value)
4291         raise InvalidValueType((self.__class__, tuple))
4292
4293     def _value_sanitize(self, value):
4294         if issubclass(value.__class__, ObjectIdentifier):
4295             return value._value
4296         if isinstance(value, str):
4297             try:
4298                 value = array("L", (pureint(arc) for arc in value.split(".")))
4299             except ValueError:
4300                 raise InvalidOID("unacceptable arcs values")
4301         if value.__class__ == tuple:
4302             try:
4303                 value = array("L", value)
4304             except OverflowError as err:
4305                 raise InvalidOID(repr(err))
4306         if value.__class__ is array:
4307             if len(value) < 2:
4308                 raise InvalidOID("less than 2 arcs")
4309             first_arc = value[0]
4310             if first_arc in (0, 1):
4311                 if not (0 <= value[1] <= 39):
4312                     raise InvalidOID("second arc is too wide")
4313             elif first_arc == 2:
4314                 pass
4315             else:
4316                 raise InvalidOID("unacceptable first arc value")
4317             if not all(arc >= 0 for arc in value):
4318                 raise InvalidOID("negative arc value")
4319             return value
4320         raise InvalidValueType((self.__class__, str, tuple))
4321
4322     @property
4323     def ready(self):
4324         return self._value is not None
4325
4326     def __getstate__(self):
4327         return ObjectIdentifierState(
4328             __version__,
4329             self.tag,
4330             self._tag_order,
4331             self._expl,
4332             self.default,
4333             self.optional,
4334             self.offset,
4335             self.llen,
4336             self.vlen,
4337             self.expl_lenindef,
4338             self.lenindef,
4339             self.ber_encoded,
4340             self._value,
4341             self.defines,
4342         )
4343
4344     def __setstate__(self, state):
4345         super().__setstate__(state)
4346         self._value = state.value
4347         self.defines = state.defines
4348
4349     def __iter__(self):
4350         self._assert_ready()
4351         return iter(self._value)
4352
4353     def __str__(self):
4354         return ".".join(str(arc) for arc in self._value or ())
4355
4356     def __hash__(self):
4357         self._assert_ready()
4358         return hash(b"".join((
4359             self.tag,
4360             bytes(self._expl or b""),
4361             str(self._value).encode("ascii"),
4362         )))
4363
4364     def __eq__(self, their):
4365         if their.__class__ == tuple:
4366             return self._value == array("L", their)
4367         if not issubclass(their.__class__, ObjectIdentifier):
4368             return False
4369         return (
4370             self.tag == their.tag and
4371             self._expl == their._expl and
4372             self._value == their._value
4373         )
4374
4375     def __lt__(self, their):
4376         return self._value < their._value
4377
4378     def __call__(
4379             self,
4380             value=None,
4381             defines=None,
4382             impl=None,
4383             expl=None,
4384             default=None,
4385             optional=None,
4386     ):
4387         return self.__class__(
4388             value=value,
4389             defines=self.defines if defines is None else defines,
4390             impl=self.tag if impl is None else impl,
4391             expl=self._expl if expl is None else expl,
4392             default=self.default if default is None else default,
4393             optional=self.optional if optional is None else optional,
4394         )
4395
4396     def _encode_octets(self):
4397         self._assert_ready()
4398         value = self._value
4399         first_value = value[1]
4400         first_arc = value[0]
4401         if first_arc == 0:
4402             pass
4403         elif first_arc == 1:
4404             first_value += 40
4405         elif first_arc == 2:
4406             first_value += 80
4407         else:  # pragma: no cover
4408             raise RuntimeError("invalid arc is stored")
4409         octets = [zero_ended_encode(first_value)]
4410         for arc in value[2:]:
4411             octets.append(zero_ended_encode(arc))
4412         return b"".join(octets)
4413
4414     def _encode(self):
4415         v = self._encode_octets()
4416         return b"".join((self.tag, len_encode(len(v)), v))
4417
4418     def _encode1st(self, state):
4419         l = len(self._encode_octets())
4420         return len(self.tag) + len_size(l) + l, state
4421
4422     def _encode2nd(self, writer, state_iter):
4423         write_full(writer, self._encode())
4424
4425     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4426         try:
4427             t, _, lv = tag_strip(tlv)
4428         except DecodeError as err:
4429             raise err.__class__(
4430                 msg=err.msg,
4431                 klass=self.__class__,
4432                 decode_path=decode_path,
4433                 offset=offset,
4434             )
4435         if t != self.tag:
4436             raise TagMismatch(
4437                 klass=self.__class__,
4438                 decode_path=decode_path,
4439                 offset=offset,
4440             )
4441         if tag_only:  # pragma: no cover
4442             yield None
4443             return
4444         try:
4445             l, llen, v = len_decode(lv)
4446         except DecodeError as err:
4447             raise err.__class__(
4448                 msg=err.msg,
4449                 klass=self.__class__,
4450                 decode_path=decode_path,
4451                 offset=offset,
4452             )
4453         if l > len(v):
4454             raise NotEnoughData(
4455                 "encoded length is longer than data",
4456                 klass=self.__class__,
4457                 decode_path=decode_path,
4458                 offset=offset,
4459             )
4460         if l == 0:
4461             raise NotEnoughData(
4462                 "zero length",
4463                 klass=self.__class__,
4464                 decode_path=decode_path,
4465                 offset=offset,
4466             )
4467         v, tail = v[:l], v[l:]
4468         arcs = array("L")
4469         ber_encoded = False
4470         while len(v) > 0:
4471             i = 0
4472             arc = 0
4473             while True:
4474                 octet = v[i]
4475                 if i == 0 and octet == 0x80:
4476                     if ctx.get("bered", False):
4477                         ber_encoded = True
4478                     else:
4479                         raise DecodeError(
4480                             "non normalized arc encoding",
4481                             klass=self.__class__,
4482                             decode_path=decode_path,
4483                             offset=offset,
4484                         )
4485                 arc = (arc << 7) | (octet & 0x7F)
4486                 if octet & 0x80 == 0:
4487                     try:
4488                         arcs.append(arc)
4489                     except OverflowError:
4490                         raise DecodeError(
4491                             "too huge value for local unsigned long",
4492                             klass=self.__class__,
4493                             decode_path=decode_path,
4494                             offset=offset,
4495                         )
4496                     v = v[i + 1:]
4497                     break
4498                 i += 1
4499                 if i == len(v):
4500                     raise DecodeError(
4501                         "unfinished OID",
4502                         klass=self.__class__,
4503                         decode_path=decode_path,
4504                         offset=offset,
4505                     )
4506         first_arc = 0
4507         second_arc = arcs[0]
4508         if 0 <= second_arc <= 39:
4509             first_arc = 0
4510         elif 40 <= second_arc <= 79:
4511             first_arc = 1
4512             second_arc -= 40
4513         else:
4514             first_arc = 2
4515             second_arc -= 80
4516         obj = self.__class__(
4517             value=array("L", (first_arc, second_arc)) + arcs[1:],
4518             impl=self.tag,
4519             expl=self._expl,
4520             default=self.default,
4521             optional=self.optional,
4522             _decoded=(offset, llen, l),
4523         )
4524         if ber_encoded:
4525             obj.ber_encoded = True
4526         yield decode_path, obj, tail
4527
4528     def __repr__(self):
4529         return pp_console_row(next(self.pps()))
4530
4531     def pps(self, decode_path=()):
4532         yield _pp(
4533             obj=self,
4534             asn1_type_name=self.asn1_type_name,
4535             obj_name=self.__class__.__name__,
4536             decode_path=decode_path,
4537             value=str(self) if self.ready else None,
4538             optional=self.optional,
4539             default=self == self.default,
4540             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4541             expl=None if self._expl is None else tag_decode(self._expl),
4542             offset=self.offset,
4543             tlen=self.tlen,
4544             llen=self.llen,
4545             vlen=self.vlen,
4546             expl_offset=self.expl_offset if self.expled else None,
4547             expl_tlen=self.expl_tlen if self.expled else None,
4548             expl_llen=self.expl_llen if self.expled else None,
4549             expl_vlen=self.expl_vlen if self.expled else None,
4550             expl_lenindef=self.expl_lenindef,
4551             ber_encoded=self.ber_encoded,
4552             bered=self.bered,
4553         )
4554         for pp in self.pps_lenindef(decode_path):
4555             yield pp
4556
4557
4558 class Enumerated(Integer):
4559     """``ENUMERATED`` integer type
4560
4561     This type is identical to :py:class:`pyderasn.Integer`, but requires
4562     schema to be specified and does not accept values missing from it.
4563     """
4564     __slots__ = ()
4565     tag_default = tag_encode(10)
4566     asn1_type_name = "ENUMERATED"
4567
4568     def __init__(
4569             self,
4570             value=None,
4571             impl=None,
4572             expl=None,
4573             default=None,
4574             optional=False,
4575             _specs=None,
4576             _decoded=(0, 0, 0),
4577             bounds=None,  # dummy argument, workability for Integer.decode
4578     ):
4579         super().__init__(
4580             value, bounds, impl, expl, default, optional, _specs, _decoded,
4581         )
4582         if len(self.specs) == 0:
4583             raise ValueError("schema must be specified")
4584
4585     def _value_sanitize(self, value):
4586         if isinstance(value, self.__class__):
4587             value = value._value
4588         elif isinstance(value, int):
4589             for _value in self.specs.values():
4590                 if _value == value:
4591                     break
4592             else:
4593                 raise DecodeError(
4594                     "unknown integer value: %s" % value,
4595                     klass=self.__class__,
4596                 )
4597         elif isinstance(value, str):
4598             value = self.specs.get(value)
4599             if value is None:
4600                 raise ObjUnknown("integer value: %s" % value)
4601         else:
4602             raise InvalidValueType((self.__class__, int, str))
4603         return value
4604
4605     def __call__(
4606             self,
4607             value=None,
4608             impl=None,
4609             expl=None,
4610             default=None,
4611             optional=None,
4612             _specs=None,
4613     ):
4614         return self.__class__(
4615             value=value,
4616             impl=self.tag if impl is None else impl,
4617             expl=self._expl if expl is None else expl,
4618             default=self.default if default is None else default,
4619             optional=self.optional if optional is None else optional,
4620             _specs=self.specs,
4621         )
4622
4623
4624 def escape_control_unicode(c):
4625     if unicat(c)[0] == "C":
4626         c = repr(c).lstrip("u").strip("'")
4627     return c
4628
4629
4630 class CommonString(OctetString):
4631     """Common class for all strings
4632
4633     Everything resembles :py:class:`pyderasn.OctetString`, except
4634     ability to deal with unicode text strings.
4635
4636     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4637     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4638     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4639     True
4640     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4641     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4642     >>> str(s)
4643     'привет Ð¼Ð¸Ñ€'
4644     >>> hexenc(bytes(s))
4645     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4646
4647     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4648     Traceback (most recent call last):
4649     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4650
4651     >>> BMPString("ада", bounds=(2, 2))
4652     Traceback (most recent call last):
4653     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4654     >>> s = BMPString("ад", bounds=(2, 2))
4655     >>> s.encoding
4656     'utf-16-be'
4657     >>> hexenc(bytes(s))
4658     '04300434'
4659
4660     .. list-table::
4661        :header-rows: 1
4662
4663        * - Class
4664          - Text Encoding, validation
4665        * - :py:class:`pyderasn.UTF8String`
4666          - utf-8
4667        * - :py:class:`pyderasn.NumericString`
4668          - proper alphabet validation
4669        * - :py:class:`pyderasn.PrintableString`
4670          - proper alphabet validation
4671        * - :py:class:`pyderasn.TeletexString`
4672          - iso-8859-1
4673        * - :py:class:`pyderasn.T61String`
4674          - iso-8859-1
4675        * - :py:class:`pyderasn.VideotexString`
4676          - iso-8859-1
4677        * - :py:class:`pyderasn.IA5String`
4678          - proper alphabet validation
4679        * - :py:class:`pyderasn.GraphicString`
4680          - iso-8859-1
4681        * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4682          - proper alphabet validation
4683        * - :py:class:`pyderasn.GeneralString`
4684          - iso-8859-1
4685        * - :py:class:`pyderasn.UniversalString`
4686          - utf-32-be
4687        * - :py:class:`pyderasn.BMPString`
4688          - utf-16-be
4689     """
4690     __slots__ = ()
4691
4692     def _value_sanitize(self, value):
4693         value_raw = None
4694         value_decoded = None
4695         if isinstance(value, self.__class__):
4696             value_raw = value._value
4697         elif value.__class__ == str:
4698             value_decoded = value
4699         elif value.__class__ == bytes:
4700             value_raw = value
4701         else:
4702             raise InvalidValueType((self.__class__, str, bytes))
4703         try:
4704             value_raw = (
4705                 value_decoded.encode(self.encoding)
4706                 if value_raw is None else value_raw
4707             )
4708             value_decoded = (
4709                 value_raw.decode(self.encoding)
4710                 if value_decoded is None else value_decoded
4711             )
4712         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4713             raise DecodeError(str(err))
4714         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4715             raise BoundsError(
4716                 self._bound_min,
4717                 len(value_decoded),
4718                 self._bound_max,
4719             )
4720         return value_raw
4721
4722     def __eq__(self, their):
4723         if their.__class__ == bytes:
4724             return self._value == their
4725         if their.__class__ == str:
4726             return self._value == their.encode(self.encoding)
4727         if not isinstance(their, self.__class__):
4728             return False
4729         return (
4730             self._value == their._value and
4731             self.tag == their.tag and
4732             self._expl == their._expl
4733         )
4734
4735     def __unicode__(self):
4736         if self.ready:
4737             return self._value.decode(self.encoding)
4738         return str(self._value)
4739
4740     def __repr__(self):
4741         return pp_console_row(next(self.pps()))
4742
4743     def pps(self, decode_path=()):
4744         value = None
4745         if self.ready:
4746             value = "".join(escape_control_unicode(c) for c in self.__unicode__())
4747         yield _pp(
4748             obj=self,
4749             asn1_type_name=self.asn1_type_name,
4750             obj_name=self.__class__.__name__,
4751             decode_path=decode_path,
4752             value=value,
4753             optional=self.optional,
4754             default=self == self.default,
4755             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4756             expl=None if self._expl is None else tag_decode(self._expl),
4757             offset=self.offset,
4758             tlen=self.tlen,
4759             llen=self.llen,
4760             vlen=self.vlen,
4761             expl_offset=self.expl_offset if self.expled else None,
4762             expl_tlen=self.expl_tlen if self.expled else None,
4763             expl_llen=self.expl_llen if self.expled else None,
4764             expl_vlen=self.expl_vlen if self.expled else None,
4765             expl_lenindef=self.expl_lenindef,
4766             ber_encoded=self.ber_encoded,
4767             bered=self.bered,
4768         )
4769         for pp in self.pps_lenindef(decode_path):
4770             yield pp
4771
4772
4773 class UTF8String(CommonString):
4774     __slots__ = ()
4775     tag_default = tag_encode(12)
4776     encoding = "utf-8"
4777     asn1_type_name = "UTF8String"
4778
4779
4780 class AllowableCharsMixin:
4781     __slots__ = ()
4782
4783     @property
4784     def allowable_chars(self):
4785         return frozenset(chr(c) for c in self._allowable_chars)
4786
4787     def _value_sanitize(self, value):
4788         value = super()._value_sanitize(value)
4789         if not frozenset(value) <= self._allowable_chars:
4790             raise DecodeError("non satisfying alphabet value")
4791         return value
4792
4793
4794 NUMERIC_ALLOWABLE_CHARS = frozenset(digits.encode("ascii") + b" ")
4795
4796
4797 class NumericString(AllowableCharsMixin, CommonString):
4798     """Numeric string
4799
4800     Its value is properly sanitized: only ASCII digits with spaces can
4801     be stored.
4802
4803     >>> NumericString().allowable_chars
4804     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4805     """
4806     __slots__ = ("_allowable_chars",)
4807     tag_default = tag_encode(18)
4808     encoding = "ascii"
4809     asn1_type_name = "NumericString"
4810
4811     def __init__(self, *args, **kwargs):
4812         self._allowable_chars = NUMERIC_ALLOWABLE_CHARS
4813         super().__init__(*args, **kwargs)
4814
4815
4816 PrintableStringState = namedtuple(
4817     "PrintableStringState",
4818     OctetStringState._fields + ("allowable_chars",),
4819     **NAMEDTUPLE_KWARGS
4820 )
4821
4822
4823 PRINTABLE_ALLOWABLE_CHARS = frozenset(
4824     (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4825 )
4826
4827
4828 class PrintableString(AllowableCharsMixin, CommonString):
4829     """Printable string
4830
4831     Its value is properly sanitized: see X.680 41.4 table 10.
4832
4833     >>> PrintableString().allowable_chars
4834     frozenset([' ', "'", ..., 'z'])
4835     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4836     PrintableString PrintableString foo*bar
4837     >>> obj.allow_asterisk, obj.allow_ampersand
4838     (True, False)
4839     """
4840     __slots__ = ("_allowable_chars",)
4841     tag_default = tag_encode(19)
4842     encoding = "ascii"
4843     asn1_type_name = "PrintableString"
4844     _asterisk = frozenset("*".encode("ascii"))
4845     _ampersand = frozenset("&".encode("ascii"))
4846
4847     def __init__(
4848             self,
4849             value=None,
4850             bounds=None,
4851             impl=None,
4852             expl=None,
4853             default=None,
4854             optional=False,
4855             _decoded=(0, 0, 0),
4856             ctx=None,
4857             allow_asterisk=False,
4858             allow_ampersand=False,
4859     ):
4860         """
4861         :param allow_asterisk: allow asterisk character
4862         :param allow_ampersand: allow ampersand character
4863         """
4864         allowable_chars = PRINTABLE_ALLOWABLE_CHARS
4865         if allow_asterisk:
4866             allowable_chars |= self._asterisk
4867         if allow_ampersand:
4868             allowable_chars |= self._ampersand
4869         self._allowable_chars = allowable_chars
4870         super().__init__(
4871             value, bounds, impl, expl, default, optional, _decoded, ctx,
4872         )
4873
4874     @property
4875     def allow_asterisk(self):
4876         """Is asterisk character allowed?
4877         """
4878         return self._asterisk <= self._allowable_chars
4879
4880     @property
4881     def allow_ampersand(self):
4882         """Is ampersand character allowed?
4883         """
4884         return self._ampersand <= self._allowable_chars
4885
4886     def __getstate__(self):
4887         return PrintableStringState(
4888             *super().__getstate__(),
4889             **{"allowable_chars": self._allowable_chars}
4890         )
4891
4892     def __setstate__(self, state):
4893         super().__setstate__(state)
4894         self._allowable_chars = state.allowable_chars
4895
4896     def __call__(
4897             self,
4898             value=None,
4899             bounds=None,
4900             impl=None,
4901             expl=None,
4902             default=None,
4903             optional=None,
4904     ):
4905         return self.__class__(
4906             value=value,
4907             bounds=(
4908                 (self._bound_min, self._bound_max)
4909                 if bounds is None else bounds
4910             ),
4911             impl=self.tag if impl is None else impl,
4912             expl=self._expl if expl is None else expl,
4913             default=self.default if default is None else default,
4914             optional=self.optional if optional is None else optional,
4915             allow_asterisk=self.allow_asterisk,
4916             allow_ampersand=self.allow_ampersand,
4917         )
4918
4919
4920 class TeletexString(CommonString):
4921     __slots__ = ()
4922     tag_default = tag_encode(20)
4923     encoding = "iso-8859-1"
4924     asn1_type_name = "TeletexString"
4925
4926
4927 class T61String(TeletexString):
4928     __slots__ = ()
4929     asn1_type_name = "T61String"
4930
4931
4932 class VideotexString(CommonString):
4933     __slots__ = ()
4934     tag_default = tag_encode(21)
4935     encoding = "iso-8859-1"
4936     asn1_type_name = "VideotexString"
4937
4938
4939 IA5_ALLOWABLE_CHARS = frozenset(b"".join(
4940     chr(c).encode("ascii") for c in range(128)
4941 ))
4942
4943
4944 class IA5String(AllowableCharsMixin, CommonString):
4945     """IA5 string
4946
4947     Its value is properly sanitized: it is a mix of
4948
4949     * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4950     * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
4951     * DEL character (0x7F)
4952
4953     It is just 7-bit ASCII.
4954
4955     >>> IA5String().allowable_chars
4956     frozenset(["NUL", ... "DEL"])
4957     """
4958     __slots__ = ("_allowable_chars",)
4959     tag_default = tag_encode(22)
4960     encoding = "ascii"
4961     asn1_type_name = "IA5"
4962
4963     def __init__(self, *args, **kwargs):
4964         self._allowable_chars = IA5_ALLOWABLE_CHARS
4965         super().__init__(*args, **kwargs)
4966
4967
4968 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4969 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4970 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4971 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4972 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4973 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4974
4975
4976 VISIBLE_ALLOWABLE_CHARS = frozenset(b"".join(
4977     chr(c).encode("ascii") for c in range(ord(" "), ord("~") + 1)
4978 ))
4979
4980
4981 class VisibleString(AllowableCharsMixin, CommonString):
4982     """Visible string
4983
4984     Its value is properly sanitized. ASCII subset from space to tilde is
4985     allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
4986
4987     >>> VisibleString().allowable_chars
4988     frozenset([" ", ... "~"])
4989     """
4990     __slots__ = ("_allowable_chars",)
4991     tag_default = tag_encode(26)
4992     encoding = "ascii"
4993     asn1_type_name = "VisibleString"
4994
4995     def __init__(self, *args, **kwargs):
4996         self._allowable_chars = VISIBLE_ALLOWABLE_CHARS
4997         super().__init__(*args, **kwargs)
4998
4999
5000 class ISO646String(VisibleString):
5001     __slots__ = ()
5002     asn1_type_name = "ISO646String"
5003
5004
5005 UTCTimeState = namedtuple(
5006     "UTCTimeState",
5007     OctetStringState._fields + ("ber_raw",),
5008     **NAMEDTUPLE_KWARGS
5009 )
5010
5011
5012 def str_to_time_fractions(value):
5013     v = pureint(value)
5014     year, v = (v // 10**10), (v % 10**10)
5015     month, v = (v // 10**8), (v % 10**8)
5016     day, v = (v // 10**6), (v % 10**6)
5017     hour, v = (v // 10**4), (v % 10**4)
5018     minute, second = (v // 100), (v % 100)
5019     return year, month, day, hour, minute, second
5020
5021
5022 class UTCTime(VisibleString):
5023     """``UTCTime`` datetime type
5024
5025     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5026     UTCTime UTCTime 2017-09-30T22:07:50
5027     >>> str(t)
5028     '170930220750Z'
5029     >>> bytes(t)
5030     b'170930220750Z'
5031     >>> t.todatetime()
5032     datetime.datetime(2017, 9, 30, 22, 7, 50)
5033     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5034     datetime.datetime(1957, 9, 30, 22, 7, 50)
5035
5036     If BER encoded value was met, then ``ber_raw`` attribute will hold
5037     its raw representation.
5038
5039     .. warning::
5040
5041        Only **naive** ``datetime`` objects are supported.
5042        Library assumes that all work is done in UTC.
5043
5044     .. warning::
5045
5046        Pay attention that ``UTCTime`` can not hold full year, so all years
5047        having < 50 years are treated as 20xx, 19xx otherwise, according to
5048        X.509 recommendation. Use ``GeneralizedTime`` instead for
5049        removing ambiguity.
5050
5051     .. warning::
5052
5053        No strict validation of UTC offsets are made (only applicable to
5054        **BER**), but very crude:
5055
5056        * minutes are not exceeding 60
5057        * offset value is not exceeding 14 hours
5058     """
5059     __slots__ = ("ber_raw",)
5060     tag_default = tag_encode(23)
5061     encoding = "ascii"
5062     asn1_type_name = "UTCTime"
5063     evgen_mode_skip_value = False
5064
5065     def __init__(
5066             self,
5067             value=None,
5068             impl=None,
5069             expl=None,
5070             default=None,
5071             optional=False,
5072             _decoded=(0, 0, 0),
5073             bounds=None,  # dummy argument, workability for OctetString.decode
5074             ctx=None,
5075     ):
5076         """
5077         :param value: set the value. Either datetime type, or
5078                       :py:class:`pyderasn.UTCTime` object
5079         :param bytes impl: override default tag with ``IMPLICIT`` one
5080         :param bytes expl: override default tag with ``EXPLICIT`` one
5081         :param default: set default value. Type same as in ``value``
5082         :param bool optional: is object ``OPTIONAL`` in sequence
5083         """
5084         super().__init__(None, None, impl, expl, None, optional, _decoded, ctx)
5085         self._value = value
5086         self.ber_raw = None
5087         if value is not None:
5088             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5089             self.ber_encoded = self.ber_raw is not None
5090         if default is not None:
5091             default, _ = self._value_sanitize(default)
5092             self.default = self.__class__(
5093                 value=default,
5094                 impl=self.tag,
5095                 expl=self._expl,
5096             )
5097             if self._value is None:
5098                 self._value = default
5099             optional = True
5100         self.optional = optional
5101
5102     def _strptime_bered(self, value):
5103         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5104         value = value[10:]
5105         if len(value) == 0:
5106             raise ValueError("no timezone")
5107         year += 2000 if year < 50 else 1900
5108         decoded = datetime(year, month, day, hour, minute)
5109         offset = 0
5110         if value[-1] == "Z":
5111             value = value[:-1]
5112         else:
5113             if len(value) < 5:
5114                 raise ValueError("invalid UTC offset")
5115             if value[-5] == "-":
5116                 sign = -1
5117             elif value[-5] == "+":
5118                 sign = 1
5119             else:
5120                 raise ValueError("invalid UTC offset")
5121             v = pureint(value[-4:])
5122             offset, v = (60 * (v % 100)), v // 100
5123             if offset >= 3600:
5124                 raise ValueError("invalid UTC offset minutes")
5125             offset += 3600 * v
5126             if offset > 14 * 3600:
5127                 raise ValueError("too big UTC offset")
5128             offset *= sign
5129             value = value[:-5]
5130         if len(value) == 0:
5131             return offset, decoded
5132         if len(value) != 2:
5133             raise ValueError("invalid UTC offset seconds")
5134         seconds = pureint(value)
5135         if seconds >= 60:
5136             raise ValueError("invalid seconds value")
5137         return offset, decoded + timedelta(seconds=seconds)
5138
5139     def _strptime(self, value):
5140         # datetime.strptime's format: %y%m%d%H%M%SZ
5141         if len(value) != LEN_YYMMDDHHMMSSZ:
5142             raise ValueError("invalid UTCTime length")
5143         if value[-1] != "Z":
5144             raise ValueError("non UTC timezone")
5145         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5146         year += 2000 if year < 50 else 1900
5147         return datetime(year, month, day, hour, minute, second)
5148
5149     def _dt_sanitize(self, value):
5150         if value.year < 1950 or value.year > 2049:
5151             raise ValueError("UTCTime can hold only 1950-2049 years")
5152         return value.replace(microsecond=0)
5153
5154     def _value_sanitize(self, value, ctx=None):
5155         if value.__class__ == bytes:
5156             try:
5157                 value_decoded = value.decode("ascii")
5158             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5159                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5160             err = None
5161             try:
5162                 return self._strptime(value_decoded), None
5163             except (TypeError, ValueError) as _err:
5164                 err = _err
5165                 if (ctx is not None) and ctx.get("bered", False):
5166                     try:
5167                         offset, _value = self._strptime_bered(value_decoded)
5168                         _value = _value - timedelta(seconds=offset)
5169                         return self._dt_sanitize(_value), value
5170                     except (TypeError, ValueError, OverflowError) as _err:
5171                         err = _err
5172             raise DecodeError(
5173                 "invalid %s format: %r" % (self.asn1_type_name, err),
5174                 klass=self.__class__,
5175             )
5176         if isinstance(value, self.__class__):
5177             return value._value, None
5178         if value.__class__ == datetime:
5179             if value.tzinfo is not None:
5180                 raise ValueError("only naive datetime supported")
5181             return self._dt_sanitize(value), None
5182         raise InvalidValueType((self.__class__, datetime))
5183
5184     def _pp_value(self):
5185         if self.ready:
5186             value = self._value.isoformat()
5187             if self.ber_encoded:
5188                 value += " (%s)" % self.ber_raw
5189             return value
5190         return None
5191
5192     def __unicode__(self):
5193         if self.ready:
5194             value = self._value.isoformat()
5195             if self.ber_encoded:
5196                 value += " (%s)" % self.ber_raw
5197             return value
5198         return str(self._pp_value())
5199
5200     def __getstate__(self):
5201         return UTCTimeState(*super().__getstate__(), **{"ber_raw": self.ber_raw})
5202
5203     def __setstate__(self, state):
5204         super().__setstate__(state)
5205         self.ber_raw = state.ber_raw
5206
5207     def __bytes__(self):
5208         self._assert_ready()
5209         return self._encode_time()
5210
5211     def __eq__(self, their):
5212         if their.__class__ == bytes:
5213             return self._encode_time() == their
5214         if their.__class__ == datetime:
5215             return self.todatetime() == their
5216         if not isinstance(their, self.__class__):
5217             return False
5218         return (
5219             self._value == their._value and
5220             self.tag == their.tag and
5221             self._expl == their._expl
5222         )
5223
5224     def _encode_time(self):
5225         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5226
5227     def _encode(self):
5228         self._assert_ready()
5229         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5230
5231     def _encode1st(self, state):
5232         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5233
5234     def _encode2nd(self, writer, state_iter):
5235         self._assert_ready()
5236         write_full(writer, self._encode())
5237
5238     def _encode_cer(self, writer):
5239         write_full(writer, self._encode())
5240
5241     def todatetime(self):
5242         return self._value
5243
5244     def __repr__(self):
5245         return pp_console_row(next(self.pps()))
5246
5247     def pps(self, decode_path=()):
5248         yield _pp(
5249             obj=self,
5250             asn1_type_name=self.asn1_type_name,
5251             obj_name=self.__class__.__name__,
5252             decode_path=decode_path,
5253             value=self._pp_value(),
5254             optional=self.optional,
5255             default=self == self.default,
5256             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5257             expl=None if self._expl is None else tag_decode(self._expl),
5258             offset=self.offset,
5259             tlen=self.tlen,
5260             llen=self.llen,
5261             vlen=self.vlen,
5262             expl_offset=self.expl_offset if self.expled else None,
5263             expl_tlen=self.expl_tlen if self.expled else None,
5264             expl_llen=self.expl_llen if self.expled else None,
5265             expl_vlen=self.expl_vlen if self.expled else None,
5266             expl_lenindef=self.expl_lenindef,
5267             ber_encoded=self.ber_encoded,
5268             bered=self.bered,
5269         )
5270         for pp in self.pps_lenindef(decode_path):
5271             yield pp
5272
5273
5274 class GeneralizedTime(UTCTime):
5275     """``GeneralizedTime`` datetime type
5276
5277     This type is similar to :py:class:`pyderasn.UTCTime`.
5278
5279     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5280     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5281     >>> str(t)
5282     '20170930220750.000123Z'
5283     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5284     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5285
5286     .. warning::
5287
5288        Only **naive** datetime objects are supported.
5289        Library assumes that all work is done in UTC.
5290
5291     .. warning::
5292
5293        Only **microsecond** fractions are supported in DER encoding.
5294        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5295        higher precision values.
5296
5297     .. warning::
5298
5299        **BER** encoded data can loss information (accuracy) during
5300        decoding because of float transformations.
5301
5302     .. warning::
5303
5304        **Zero** year is unsupported.
5305     """
5306     __slots__ = ()
5307     tag_default = tag_encode(24)
5308     asn1_type_name = "GeneralizedTime"
5309
5310     def _dt_sanitize(self, value):
5311         return value
5312
5313     def _strptime_bered(self, value):
5314         if len(value) < 4 + 3 * 2:
5315             raise ValueError("invalid GeneralizedTime")
5316         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5317         decoded = datetime(year, month, day, hour)
5318         offset, value = 0, value[10:]
5319         if len(value) == 0:
5320             return offset, decoded
5321         if value[-1] == "Z":
5322             value = value[:-1]
5323         else:
5324             for char, sign in (("-", -1), ("+", 1)):
5325                 idx = value.rfind(char)
5326                 if idx == -1:
5327                     continue
5328                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5329                 v = pureint(offset_raw)
5330                 if len(offset_raw) == 4:
5331                     offset, v = (60 * (v % 100)), v // 100
5332                     if offset >= 3600:
5333                         raise ValueError("invalid UTC offset minutes")
5334                 elif len(offset_raw) == 2:
5335                     pass
5336                 else:
5337                     raise ValueError("invalid UTC offset")
5338                 offset += 3600 * v
5339                 if offset > 14 * 3600:
5340                     raise ValueError("too big UTC offset")
5341                 offset *= sign
5342                 break
5343         if len(value) == 0:
5344             return offset, decoded
5345         if value[0] in DECIMAL_SIGNS:
5346             return offset, (
5347                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5348             )
5349         if len(value) < 2:
5350             raise ValueError("stripped minutes")
5351         decoded += timedelta(seconds=60 * pureint(value[:2]))
5352         value = value[2:]
5353         if len(value) == 0:
5354             return offset, decoded
5355         if value[0] in DECIMAL_SIGNS:
5356             return offset, (
5357                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5358             )
5359         if len(value) < 2:
5360             raise ValueError("stripped seconds")
5361         decoded += timedelta(seconds=pureint(value[:2]))
5362         value = value[2:]
5363         if len(value) == 0:
5364             return offset, decoded
5365         if value[0] not in DECIMAL_SIGNS:
5366             raise ValueError("invalid format after seconds")
5367         return offset, (
5368             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5369         )
5370
5371     def _strptime(self, value):
5372         l = len(value)
5373         if l == LEN_YYYYMMDDHHMMSSZ:
5374             # datetime.strptime's format: %Y%m%d%H%M%SZ
5375             if value[-1] != "Z":
5376                 raise ValueError("non UTC timezone")
5377             return datetime(*str_to_time_fractions(value[:-1]))
5378         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5379             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5380             if value[-1] != "Z":
5381                 raise ValueError("non UTC timezone")
5382             if value[14] != ".":
5383                 raise ValueError("no fractions separator")
5384             us = value[15:-1]
5385             if us[-1] == "0":
5386                 raise ValueError("trailing zero")
5387             us_len = len(us)
5388             if us_len > 6:
5389                 raise ValueError("only microsecond fractions are supported")
5390             us = pureint(us + ("0" * (6 - us_len)))
5391             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5392             return datetime(year, month, day, hour, minute, second, us)
5393         raise ValueError("invalid GeneralizedTime length")
5394
5395     def _encode_time(self):
5396         value = self._value
5397         encoded = value.strftime("%Y%m%d%H%M%S")
5398         if value.microsecond > 0:
5399             encoded += (".%06d" % value.microsecond).rstrip("0")
5400         return (encoded + "Z").encode("ascii")
5401
5402     def _encode(self):
5403         self._assert_ready()
5404         value = self._value
5405         if value.microsecond > 0:
5406             encoded = self._encode_time()
5407             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5408         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5409
5410     def _encode1st(self, state):
5411         self._assert_ready()
5412         vlen = len(self._encode_time())
5413         return len(self.tag) + len_size(vlen) + vlen, state
5414
5415     def _encode2nd(self, writer, state_iter):
5416         write_full(writer, self._encode())
5417
5418
5419 class GraphicString(CommonString):
5420     __slots__ = ()
5421     tag_default = tag_encode(25)
5422     encoding = "iso-8859-1"
5423     asn1_type_name = "GraphicString"
5424
5425
5426 class GeneralString(CommonString):
5427     __slots__ = ()
5428     tag_default = tag_encode(27)
5429     encoding = "iso-8859-1"
5430     asn1_type_name = "GeneralString"
5431
5432
5433 class UniversalString(CommonString):
5434     __slots__ = ()
5435     tag_default = tag_encode(28)
5436     encoding = "utf-32-be"
5437     asn1_type_name = "UniversalString"
5438
5439
5440 class BMPString(CommonString):
5441     __slots__ = ()
5442     tag_default = tag_encode(30)
5443     encoding = "utf-16-be"
5444     asn1_type_name = "BMPString"
5445
5446
5447 ChoiceState = namedtuple(
5448     "ChoiceState",
5449     BasicState._fields + ("specs", "value",),
5450     **NAMEDTUPLE_KWARGS
5451 )
5452
5453
5454 class Choice(Obj):
5455     """``CHOICE`` special type
5456
5457     ::
5458
5459         class GeneralName(Choice):
5460             schema = (
5461                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5462                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5463             )
5464
5465     >>> gn = GeneralName()
5466     GeneralName CHOICE
5467     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5468     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5469     >>> gn["dNSName"] = IA5String("bar.baz")
5470     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5471     >>> gn["rfc822Name"]
5472     None
5473     >>> gn["dNSName"]
5474     [2] IA5String IA5 bar.baz
5475     >>> gn.choice
5476     'dNSName'
5477     >>> gn.value == gn["dNSName"]
5478     True
5479     >>> gn.specs
5480     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5481
5482     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5483     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5484     """
5485     __slots__ = ("specs",)
5486     tag_default = None
5487     asn1_type_name = "CHOICE"
5488
5489     def __init__(
5490             self,
5491             value=None,
5492             schema=None,
5493             impl=None,
5494             expl=None,
5495             default=None,
5496             optional=False,
5497             _decoded=(0, 0, 0),
5498     ):
5499         """
5500         :param value: set the value. Either ``(choice, value)`` tuple, or
5501                       :py:class:`pyderasn.Choice` object
5502         :param bytes impl: can not be set, do **not** use it
5503         :param bytes expl: override default tag with ``EXPLICIT`` one
5504         :param default: set default value. Type same as in ``value``
5505         :param bool optional: is object ``OPTIONAL`` in sequence
5506         """
5507         if impl is not None:
5508             raise ValueError("no implicit tag allowed for CHOICE")
5509         super().__init__(None, expl, default, optional, _decoded)
5510         if schema is None:
5511             schema = getattr(self, "schema", ())
5512         if len(schema) == 0:
5513             raise ValueError("schema must be specified")
5514         self.specs = (
5515             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5516         )
5517         self._value = None
5518         if value is not None:
5519             self._value = self._value_sanitize(value)
5520         if default is not None:
5521             default_value = self._value_sanitize(default)
5522             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5523             default_obj.specs = self.specs
5524             default_obj._value = default_value
5525             self.default = default_obj
5526             if value is None:
5527                 self._value = copy(default_obj._value)
5528         if self._expl is not None:
5529             tag_class, _, tag_num = tag_decode(self._expl)
5530             self._tag_order = (tag_class, tag_num)
5531
5532     def _value_sanitize(self, value):
5533         if (value.__class__ == tuple) and len(value) == 2:
5534             choice, obj = value
5535             spec = self.specs.get(choice)
5536             if spec is None:
5537                 raise ObjUnknown(choice)
5538             if not isinstance(obj, spec.__class__):
5539                 raise InvalidValueType((spec,))
5540             return (choice, spec(obj))
5541         if isinstance(value, self.__class__):
5542             return value._value
5543         raise InvalidValueType((self.__class__, tuple))
5544
5545     @property
5546     def ready(self):
5547         return self._value is not None and self._value[1].ready
5548
5549     @property
5550     def bered(self):
5551         return self.expl_lenindef or (
5552             (self._value is not None) and
5553             self._value[1].bered
5554         )
5555
5556     def __getstate__(self):
5557         return ChoiceState(
5558             __version__,
5559             self.tag,
5560             self._tag_order,
5561             self._expl,
5562             self.default,
5563             self.optional,
5564             self.offset,
5565             self.llen,
5566             self.vlen,
5567             self.expl_lenindef,
5568             self.lenindef,
5569             self.ber_encoded,
5570             self.specs,
5571             copy(self._value),
5572         )
5573
5574     def __setstate__(self, state):
5575         super().__setstate__(state)
5576         self.specs = state.specs
5577         self._value = state.value
5578
5579     def __eq__(self, their):
5580         if (their.__class__ == tuple) and len(their) == 2:
5581             return self._value == their
5582         if not isinstance(their, self.__class__):
5583             return False
5584         return (
5585             self.specs == their.specs and
5586             self._value == their._value
5587         )
5588
5589     def __call__(
5590             self,
5591             value=None,
5592             expl=None,
5593             default=None,
5594             optional=None,
5595     ):
5596         return self.__class__(
5597             value=value,
5598             schema=self.specs,
5599             expl=self._expl if expl is None else expl,
5600             default=self.default if default is None else default,
5601             optional=self.optional if optional is None else optional,
5602         )
5603
5604     @property
5605     def choice(self):
5606         """Name of the choice
5607         """
5608         self._assert_ready()
5609         return self._value[0]
5610
5611     @property
5612     def value(self):
5613         """Value of underlying choice
5614         """
5615         self._assert_ready()
5616         return self._value[1]
5617
5618     @property
5619     def tag_order(self):
5620         self._assert_ready()
5621         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5622
5623     @property
5624     def tag_order_cer(self):
5625         return min(v.tag_order_cer for v in self.specs.values())
5626
5627     def __getitem__(self, key):
5628         if key not in self.specs:
5629             raise ObjUnknown(key)
5630         if self._value is None:
5631             return None
5632         choice, value = self._value
5633         if choice != key:
5634             return None
5635         return value
5636
5637     def __setitem__(self, key, value):
5638         spec = self.specs.get(key)
5639         if spec is None:
5640             raise ObjUnknown(key)
5641         if not isinstance(value, spec.__class__):
5642             raise InvalidValueType((spec.__class__,))
5643         self._value = (key, spec(value))
5644
5645     @property
5646     def tlen(self):
5647         return 0
5648
5649     @property
5650     def decoded(self):
5651         return self._value[1].decoded if self.ready else False
5652
5653     def _encode(self):
5654         self._assert_ready()
5655         return self._value[1].encode()
5656
5657     def _encode1st(self, state):
5658         self._assert_ready()
5659         return self._value[1].encode1st(state)
5660
5661     def _encode2nd(self, writer, state_iter):
5662         self._value[1].encode2nd(writer, state_iter)
5663
5664     def _encode_cer(self, writer):
5665         self._assert_ready()
5666         self._value[1].encode_cer(writer)
5667
5668     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5669         for choice, spec in self.specs.items():
5670             sub_decode_path = decode_path + (choice,)
5671             try:
5672                 spec.decode(
5673                     tlv,
5674                     offset=offset,
5675                     leavemm=True,
5676                     decode_path=sub_decode_path,
5677                     ctx=ctx,
5678                     tag_only=True,
5679                     _ctx_immutable=False,
5680                 )
5681             except TagMismatch:
5682                 continue
5683             break
5684         else:
5685             raise TagMismatch(
5686                 klass=self.__class__,
5687                 decode_path=decode_path,
5688                 offset=offset,
5689             )
5690         if tag_only:  # pragma: no cover
5691             yield None
5692             return
5693         if evgen_mode:
5694             for _decode_path, value, tail in spec.decode_evgen(
5695                     tlv,
5696                     offset=offset,
5697                     leavemm=True,
5698                     decode_path=sub_decode_path,
5699                     ctx=ctx,
5700                     _ctx_immutable=False,
5701             ):
5702                 yield _decode_path, value, tail
5703         else:
5704             _, value, tail = next(spec.decode_evgen(
5705                 tlv,
5706                 offset=offset,
5707                 leavemm=True,
5708                 decode_path=sub_decode_path,
5709                 ctx=ctx,
5710                 _ctx_immutable=False,
5711                 _evgen_mode=False,
5712             ))
5713         obj = self.__class__(
5714             schema=self.specs,
5715             expl=self._expl,
5716             default=self.default,
5717             optional=self.optional,
5718             _decoded=(offset, 0, value.fulllen),
5719         )
5720         obj._value = (choice, value)
5721         yield decode_path, obj, tail
5722
5723     def __repr__(self):
5724         value = pp_console_row(next(self.pps()))
5725         if self.ready:
5726             value = "%s[%r]" % (value, self.value)
5727         return value
5728
5729     def pps(self, decode_path=()):
5730         yield _pp(
5731             obj=self,
5732             asn1_type_name=self.asn1_type_name,
5733             obj_name=self.__class__.__name__,
5734             decode_path=decode_path,
5735             value=self.choice if self.ready else None,
5736             optional=self.optional,
5737             default=self == self.default,
5738             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5739             expl=None if self._expl is None else tag_decode(self._expl),
5740             offset=self.offset,
5741             tlen=self.tlen,
5742             llen=self.llen,
5743             vlen=self.vlen,
5744             expl_lenindef=self.expl_lenindef,
5745             bered=self.bered,
5746         )
5747         if self.ready:
5748             yield self.value.pps(decode_path=decode_path + (self.choice,))
5749         for pp in self.pps_lenindef(decode_path):
5750             yield pp
5751
5752
5753 class PrimitiveTypes(Choice):
5754     """Predefined ``CHOICE`` for all generic primitive types
5755
5756     It could be useful for general decoding of some unspecified values:
5757
5758     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5759     OCTET STRING 3 bytes 666f6f
5760     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5761     INTEGER 1193046
5762     """
5763     __slots__ = ()
5764     schema = tuple((klass.__name__, klass()) for klass in (
5765         Boolean,
5766         Integer,
5767         BitString,
5768         OctetString,
5769         Null,
5770         ObjectIdentifier,
5771         UTF8String,
5772         NumericString,
5773         PrintableString,
5774         TeletexString,
5775         VideotexString,
5776         IA5String,
5777         UTCTime,
5778         GeneralizedTime,
5779         GraphicString,
5780         VisibleString,
5781         ISO646String,
5782         GeneralString,
5783         UniversalString,
5784         BMPString,
5785     ))
5786
5787
5788 AnyState = namedtuple(
5789     "AnyState",
5790     BasicState._fields + ("value", "defined"),
5791     **NAMEDTUPLE_KWARGS
5792 )
5793
5794
5795 class Any(Obj):
5796     """``ANY`` special type
5797
5798     >>> Any(Integer(-123))
5799     ANY INTEGER -123 (0X:7B)
5800     >>> a = Any(OctetString(b"hello world").encode())
5801     ANY 040b68656c6c6f20776f726c64
5802     >>> hexenc(bytes(a))
5803     b'0x040x0bhello world'
5804     """
5805     __slots__ = ("defined",)
5806     tag_default = tag_encode(0)
5807     asn1_type_name = "ANY"
5808
5809     def __init__(
5810             self,
5811             value=None,
5812             expl=None,
5813             optional=False,
5814             _decoded=(0, 0, 0),
5815     ):
5816         """
5817         :param value: set the value. Either any kind of pyderasn's
5818                       **ready** object, or bytes. Pay attention that
5819                       **no** validation is performed if raw binary value
5820                       is valid TLV, except just tag decoding
5821         :param bytes expl: override default tag with ``EXPLICIT`` one
5822         :param bool optional: is object ``OPTIONAL`` in sequence
5823         """
5824         super().__init__(None, expl, None, optional, _decoded)
5825         if value is None:
5826             self._value = None
5827         else:
5828             value = self._value_sanitize(value)
5829             self._value = value
5830             if self._expl is None:
5831                 if value.__class__ == bytes:
5832                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5833                 else:
5834                     tag_class, tag_num = value.tag_order
5835             else:
5836                 tag_class, _, tag_num = tag_decode(self._expl)
5837             self._tag_order = (tag_class, tag_num)
5838         self.defined = None
5839
5840     def _value_sanitize(self, value):
5841         if value.__class__ == bytes:
5842             if len(value) == 0:
5843                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5844             return value
5845         if isinstance(value, self.__class__):
5846             return value._value
5847         if not isinstance(value, Obj):
5848             raise InvalidValueType((self.__class__, Obj, bytes))
5849         return value
5850
5851     @property
5852     def ready(self):
5853         return self._value is not None
5854
5855     @property
5856     def tag_order(self):
5857         self._assert_ready()
5858         return self._tag_order
5859
5860     @property
5861     def bered(self):
5862         if self.expl_lenindef or self.lenindef:
5863             return True
5864         if self.defined is None:
5865             return False
5866         return self.defined[1].bered
5867
5868     def __getstate__(self):
5869         return AnyState(
5870             __version__,
5871             self.tag,
5872             self._tag_order,
5873             self._expl,
5874             None,
5875             self.optional,
5876             self.offset,
5877             self.llen,
5878             self.vlen,
5879             self.expl_lenindef,
5880             self.lenindef,
5881             self.ber_encoded,
5882             self._value,
5883             self.defined,
5884         )
5885
5886     def __setstate__(self, state):
5887         super().__setstate__(state)
5888         self._value = state.value
5889         self.defined = state.defined
5890
5891     def __eq__(self, their):
5892         if their.__class__ == bytes:
5893             if self._value.__class__ == bytes:
5894                 return self._value == their
5895             return self._value.encode() == their
5896         if issubclass(their.__class__, Any):
5897             if self.ready and their.ready:
5898                 return bytes(self) == bytes(their)
5899             return self.ready == their.ready
5900         return False
5901
5902     def __call__(
5903             self,
5904             value=None,
5905             expl=None,
5906             optional=None,
5907     ):
5908         return self.__class__(
5909             value=value,
5910             expl=self._expl if expl is None else expl,
5911             optional=self.optional if optional is None else optional,
5912         )
5913
5914     def __bytes__(self):
5915         self._assert_ready()
5916         value = self._value
5917         if value.__class__ == bytes:
5918             return value
5919         return self._value.encode()
5920
5921     @property
5922     def tlen(self):
5923         return 0
5924
5925     def _encode(self):
5926         self._assert_ready()
5927         value = self._value
5928         if value.__class__ == bytes:
5929             return value
5930         return value.encode()
5931
5932     def _encode1st(self, state):
5933         self._assert_ready()
5934         value = self._value
5935         if value.__class__ == bytes:
5936             return len(value), state
5937         return value.encode1st(state)
5938
5939     def _encode2nd(self, writer, state_iter):
5940         value = self._value
5941         if value.__class__ == bytes:
5942             write_full(writer, value)
5943         else:
5944             value.encode2nd(writer, state_iter)
5945
5946     def _encode_cer(self, writer):
5947         self._assert_ready()
5948         value = self._value
5949         if value.__class__ == bytes:
5950             write_full(writer, value)
5951         else:
5952             value.encode_cer(writer)
5953
5954     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5955         try:
5956             t, tlen, lv = tag_strip(tlv)
5957         except DecodeError as err:
5958             raise err.__class__(
5959                 msg=err.msg,
5960                 klass=self.__class__,
5961                 decode_path=decode_path,
5962                 offset=offset,
5963             )
5964         try:
5965             l, llen, v = len_decode(lv)
5966         except LenIndefForm as err:
5967             if not ctx.get("bered", False):
5968                 raise err.__class__(
5969                     msg=err.msg,
5970                     klass=self.__class__,
5971                     decode_path=decode_path,
5972                     offset=offset,
5973                 )
5974             llen, vlen, v = 1, 0, lv[1:]
5975             sub_offset = offset + tlen + llen
5976             chunk_i = 0
5977             while v[:EOC_LEN].tobytes() != EOC:
5978                 chunk, v = Any().decode(
5979                     v,
5980                     offset=sub_offset,
5981                     decode_path=decode_path + (str(chunk_i),),
5982                     leavemm=True,
5983                     ctx=ctx,
5984                     _ctx_immutable=False,
5985                 )
5986                 vlen += chunk.tlvlen
5987                 sub_offset += chunk.tlvlen
5988                 chunk_i += 1
5989             tlvlen = tlen + llen + vlen + EOC_LEN
5990             obj = self.__class__(
5991                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5992                 expl=self._expl,
5993                 optional=self.optional,
5994                 _decoded=(offset, 0, tlvlen),
5995             )
5996             obj.lenindef = True
5997             obj.tag = t.tobytes()
5998             yield decode_path, obj, v[EOC_LEN:]
5999             return
6000         except DecodeError as err:
6001             raise err.__class__(
6002                 msg=err.msg,
6003                 klass=self.__class__,
6004                 decode_path=decode_path,
6005                 offset=offset,
6006             )
6007         if l > len(v):
6008             raise NotEnoughData(
6009                 "encoded length is longer than data",
6010                 klass=self.__class__,
6011                 decode_path=decode_path,
6012                 offset=offset,
6013             )
6014         tlvlen = tlen + llen + l
6015         v, tail = tlv[:tlvlen], v[l:]
6016         obj = self.__class__(
6017             value=None if evgen_mode else v.tobytes(),
6018             expl=self._expl,
6019             optional=self.optional,
6020             _decoded=(offset, 0, tlvlen),
6021         )
6022         obj.tag = t.tobytes()
6023         yield decode_path, obj, tail
6024
6025     def __repr__(self):
6026         return pp_console_row(next(self.pps()))
6027
6028     def pps(self, decode_path=()):
6029         value = self._value
6030         if value is None:
6031             pass
6032         elif value.__class__ == bytes:
6033             value = None
6034         else:
6035             value = repr(value)
6036         yield _pp(
6037             obj=self,
6038             asn1_type_name=self.asn1_type_name,
6039             obj_name=self.__class__.__name__,
6040             decode_path=decode_path,
6041             value=value,
6042             blob=self._value if self._value.__class__ == bytes else None,
6043             optional=self.optional,
6044             default=self == self.default,
6045             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6046             expl=None if self._expl is None else tag_decode(self._expl),
6047             offset=self.offset,
6048             tlen=self.tlen,
6049             llen=self.llen,
6050             vlen=self.vlen,
6051             expl_offset=self.expl_offset if self.expled else None,
6052             expl_tlen=self.expl_tlen if self.expled else None,
6053             expl_llen=self.expl_llen if self.expled else None,
6054             expl_vlen=self.expl_vlen if self.expled else None,
6055             expl_lenindef=self.expl_lenindef,
6056             lenindef=self.lenindef,
6057             bered=self.bered,
6058         )
6059         defined_by, defined = self.defined or (None, None)
6060         if defined_by is not None:
6061             yield defined.pps(
6062                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6063             )
6064         for pp in self.pps_lenindef(decode_path):
6065             yield pp
6066
6067
6068 ########################################################################
6069 # ASN.1 constructed types
6070 ########################################################################
6071
6072 def abs_decode_path(decode_path, rel_path):
6073     """Create an absolute decode path from current and relative ones
6074
6075     :param decode_path: current decode path, starting point. Tuple of strings
6076     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6077                      If first tuple's element is "/", then treat it as
6078                      an absolute path, ignoring ``decode_path`` as
6079                      starting point. Also this tuple can contain ".."
6080                      elements, stripping the leading element from
6081                      ``decode_path``
6082
6083     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6084     ("foo", "bar", "baz", "whatever")
6085     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6086     ("foo", "whatever")
6087     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6088     ("baz", "whatever")
6089     """
6090     if rel_path[0] == "/":
6091         return rel_path[1:]
6092     if rel_path[0] == "..":
6093         return abs_decode_path(decode_path[:-1], rel_path[1:])
6094     return decode_path + rel_path
6095
6096
6097 SequenceState = namedtuple(
6098     "SequenceState",
6099     BasicState._fields + ("specs", "value",),
6100     **NAMEDTUPLE_KWARGS
6101 )
6102
6103
6104 class SequenceEncode1stMixin:
6105     __slots__ = ()
6106
6107     def _encode1st(self, state):
6108         state.append(0)
6109         idx = len(state) - 1
6110         vlen = 0
6111         for v in self._values_for_encoding():
6112             l, _ = v.encode1st(state)
6113             vlen += l
6114         state[idx] = vlen
6115         return len(self.tag) + len_size(vlen) + vlen, state
6116
6117
6118 class Sequence(SequenceEncode1stMixin, Obj):
6119     """``SEQUENCE`` structure type
6120
6121     You have to make specification of sequence::
6122
6123         class Extension(Sequence):
6124             schema = (
6125                 ("extnID", ObjectIdentifier()),
6126                 ("critical", Boolean(default=False)),
6127                 ("extnValue", OctetString()),
6128             )
6129
6130     Then, you can work with it as with dictionary.
6131
6132     >>> ext = Extension()
6133     >>> Extension().specs
6134     OrderedDict([
6135         ('extnID', OBJECT IDENTIFIER),
6136         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6137         ('extnValue', OCTET STRING),
6138     ])
6139     >>> ext["extnID"] = "1.2.3"
6140     Traceback (most recent call last):
6141     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6142     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6143
6144     You can determine if sequence is ready to be encoded:
6145
6146     >>> ext.ready
6147     False
6148     >>> ext.encode()
6149     Traceback (most recent call last):
6150     pyderasn.ObjNotReady: object is not ready: extnValue
6151     >>> ext["extnValue"] = OctetString(b"foobar")
6152     >>> ext.ready
6153     True
6154
6155     Value you want to assign, must have the same **type** as in
6156     corresponding specification, but it can have different tags,
6157     optional/default attributes -- they will be taken from specification
6158     automatically::
6159
6160         class TBSCertificate(Sequence):
6161             schema = (
6162                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6163             [...]
6164
6165     >>> tbs = TBSCertificate()
6166     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6167
6168     Assign ``None`` to remove value from sequence.
6169
6170     You can set values in Sequence during its initialization:
6171
6172     >>> AlgorithmIdentifier((
6173         ("algorithm", ObjectIdentifier("1.2.3")),
6174         ("parameters", Any(Null()))
6175     ))
6176     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6177
6178     You can determine if value exists/set in the sequence and take its value:
6179
6180     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6181     (True, True, False)
6182     >>> ext["extnID"]
6183     OBJECT IDENTIFIER 1.2.3
6184
6185     But pay attention that if value has default, then it won't be (not
6186     in) in the sequence (because ``DEFAULT`` must not be encoded in
6187     DER), but you can read its value:
6188
6189     >>> "critical" in ext, ext["critical"]
6190     (False, BOOLEAN False)
6191     >>> ext["critical"] = Boolean(True)
6192     >>> "critical" in ext, ext["critical"]
6193     (True, BOOLEAN True)
6194
6195     All defaulted values are always optional.
6196
6197     .. _allow_default_values_ctx:
6198
6199     DER prohibits default value encoding and will raise an error if
6200     default value is unexpectedly met during decode.
6201     If :ref:`bered <bered_ctx>` context option is set, then no error
6202     will be raised, but ``bered`` attribute set. You can disable strict
6203     defaulted values existence validation by setting
6204     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6205
6206     All values with DEFAULT specified are decoded atomically in
6207     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6208     SEQUENCE, then it will be yielded as a single element, not
6209     disassembled. That is required for DEFAULT existence check.
6210
6211     Two sequences are equal if they have equal specification (schema),
6212     implicit/explicit tagging and the same values.
6213     """
6214     __slots__ = ("specs",)
6215     tag_default = tag_encode(form=TagFormConstructed, num=16)
6216     asn1_type_name = "SEQUENCE"
6217
6218     def __init__(
6219             self,
6220             value=None,
6221             schema=None,
6222             impl=None,
6223             expl=None,
6224             default=None,
6225             optional=False,
6226             _decoded=(0, 0, 0),
6227     ):
6228         super().__init__(impl, expl, default, optional, _decoded)
6229         if schema is None:
6230             schema = getattr(self, "schema", ())
6231         self.specs = (
6232             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6233         )
6234         self._value = {}
6235         if value is not None:
6236             if issubclass(value.__class__, Sequence):
6237                 self._value = value._value
6238             elif hasattr(value, "__iter__"):
6239                 for seq_key, seq_value in value:
6240                     self[seq_key] = seq_value
6241             else:
6242                 raise InvalidValueType((Sequence,))
6243         if default is not None:
6244             if not issubclass(default.__class__, Sequence):
6245                 raise InvalidValueType((Sequence,))
6246             default_value = default._value
6247             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6248             default_obj.specs = self.specs
6249             default_obj._value = default_value
6250             self.default = default_obj
6251             if value is None:
6252                 self._value = copy(default_obj._value)
6253
6254     @property
6255     def ready(self):
6256         for name, spec in self.specs.items():
6257             value = self._value.get(name)
6258             if value is None:
6259                 if spec.optional:
6260                     continue
6261                 return False
6262             if not value.ready:
6263                 return False
6264         return True
6265
6266     @property
6267     def bered(self):
6268         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6269             return True
6270         return any(value.bered for value in self._value.values())
6271
6272     def __getstate__(self):
6273         return SequenceState(
6274             __version__,
6275             self.tag,
6276             self._tag_order,
6277             self._expl,
6278             self.default,
6279             self.optional,
6280             self.offset,
6281             self.llen,
6282             self.vlen,
6283             self.expl_lenindef,
6284             self.lenindef,
6285             self.ber_encoded,
6286             self.specs,
6287             {k: copy(v) for k, v in self._value.items()},
6288         )
6289
6290     def __setstate__(self, state):
6291         super().__setstate__(state)
6292         self.specs = state.specs
6293         self._value = state.value
6294
6295     def __eq__(self, their):
6296         if not isinstance(their, self.__class__):
6297             return False
6298         return (
6299             self.specs == their.specs and
6300             self.tag == their.tag and
6301             self._expl == their._expl and
6302             self._value == their._value
6303         )
6304
6305     def __call__(
6306             self,
6307             value=None,
6308             impl=None,
6309             expl=None,
6310             default=None,
6311             optional=None,
6312     ):
6313         return self.__class__(
6314             value=value,
6315             schema=self.specs,
6316             impl=self.tag if impl is None else impl,
6317             expl=self._expl if expl is None else expl,
6318             default=self.default if default is None else default,
6319             optional=self.optional if optional is None else optional,
6320         )
6321
6322     def __contains__(self, key):
6323         return key in self._value
6324
6325     def __setitem__(self, key, value):
6326         spec = self.specs.get(key)
6327         if spec is None:
6328             raise ObjUnknown(key)
6329         if value is None:
6330             self._value.pop(key, None)
6331             return
6332         if not isinstance(value, spec.__class__):
6333             raise InvalidValueType((spec.__class__,))
6334         value = spec(value=value)
6335         if spec.default is not None and value == spec.default:
6336             self._value.pop(key, None)
6337             return
6338         self._value[key] = value
6339
6340     def __getitem__(self, key):
6341         value = self._value.get(key)
6342         if value is not None:
6343             return value
6344         spec = self.specs.get(key)
6345         if spec is None:
6346             raise ObjUnknown(key)
6347         if spec.default is not None:
6348             return spec.default
6349         return None
6350
6351     def _values_for_encoding(self):
6352         for name, spec in self.specs.items():
6353             value = self._value.get(name)
6354             if value is None:
6355                 if spec.optional:
6356                     continue
6357                 raise ObjNotReady(name)
6358             yield value
6359
6360     def _encode(self):
6361         v = b"".join(v.encode() for v in self._values_for_encoding())
6362         return b"".join((self.tag, len_encode(len(v)), v))
6363
6364     def _encode2nd(self, writer, state_iter):
6365         write_full(writer, self.tag + len_encode(next(state_iter)))
6366         for v in self._values_for_encoding():
6367             v.encode2nd(writer, state_iter)
6368
6369     def _encode_cer(self, writer):
6370         write_full(writer, self.tag + LENINDEF)
6371         for v in self._values_for_encoding():
6372             v.encode_cer(writer)
6373         write_full(writer, EOC)
6374
6375     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6376         try:
6377             t, tlen, lv = tag_strip(tlv)
6378         except DecodeError as err:
6379             raise err.__class__(
6380                 msg=err.msg,
6381                 klass=self.__class__,
6382                 decode_path=decode_path,
6383                 offset=offset,
6384             )
6385         if t != self.tag:
6386             raise TagMismatch(
6387                 klass=self.__class__,
6388                 decode_path=decode_path,
6389                 offset=offset,
6390             )
6391         if tag_only:  # pragma: no cover
6392             yield None
6393             return
6394         lenindef = False
6395         ctx_bered = ctx.get("bered", False)
6396         try:
6397             l, llen, v = len_decode(lv)
6398         except LenIndefForm as err:
6399             if not ctx_bered:
6400                 raise err.__class__(
6401                     msg=err.msg,
6402                     klass=self.__class__,
6403                     decode_path=decode_path,
6404                     offset=offset,
6405                 )
6406             l, llen, v = 0, 1, lv[1:]
6407             lenindef = True
6408         except DecodeError as err:
6409             raise err.__class__(
6410                 msg=err.msg,
6411                 klass=self.__class__,
6412                 decode_path=decode_path,
6413                 offset=offset,
6414             )
6415         if l > len(v):
6416             raise NotEnoughData(
6417                 "encoded length is longer than data",
6418                 klass=self.__class__,
6419                 decode_path=decode_path,
6420                 offset=offset,
6421             )
6422         if not lenindef:
6423             v, tail = v[:l], v[l:]
6424         vlen = 0
6425         sub_offset = offset + tlen + llen
6426         values = {}
6427         ber_encoded = False
6428         ctx_allow_default_values = ctx.get("allow_default_values", False)
6429         for name, spec in self.specs.items():
6430             if spec.optional and (
6431                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6432                     len(v) == 0
6433             ):
6434                 continue
6435             spec_defaulted = spec.default is not None
6436             sub_decode_path = decode_path + (name,)
6437             try:
6438                 if evgen_mode and not spec_defaulted:
6439                     for _decode_path, value, v_tail in spec.decode_evgen(
6440                             v,
6441                             sub_offset,
6442                             leavemm=True,
6443                             decode_path=sub_decode_path,
6444                             ctx=ctx,
6445                             _ctx_immutable=False,
6446                     ):
6447                         yield _decode_path, value, v_tail
6448                 else:
6449                     _, value, v_tail = next(spec.decode_evgen(
6450                         v,
6451                         sub_offset,
6452                         leavemm=True,
6453                         decode_path=sub_decode_path,
6454                         ctx=ctx,
6455                         _ctx_immutable=False,
6456                         _evgen_mode=False,
6457                     ))
6458             except TagMismatch as err:
6459                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6460                     continue
6461                 raise
6462
6463             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6464             if not evgen_mode and defined is not None:
6465                 defined_by, defined_spec = defined
6466                 if issubclass(value.__class__, SequenceOf):
6467                     for i, _value in enumerate(value):
6468                         sub_sub_decode_path = sub_decode_path + (
6469                             str(i),
6470                             DecodePathDefBy(defined_by),
6471                         )
6472                         defined_value, defined_tail = defined_spec.decode(
6473                             memoryview(bytes(_value)),
6474                             sub_offset + (
6475                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6476                                 if value.expled else (value.tlen + value.llen)
6477                             ),
6478                             leavemm=True,
6479                             decode_path=sub_sub_decode_path,
6480                             ctx=ctx,
6481                             _ctx_immutable=False,
6482                         )
6483                         if len(defined_tail) > 0:
6484                             raise DecodeError(
6485                                 "remaining data",
6486                                 klass=self.__class__,
6487                                 decode_path=sub_sub_decode_path,
6488                                 offset=offset,
6489                             )
6490                         _value.defined = (defined_by, defined_value)
6491                 else:
6492                     defined_value, defined_tail = defined_spec.decode(
6493                         memoryview(bytes(value)),
6494                         sub_offset + (
6495                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6496                             if value.expled else (value.tlen + value.llen)
6497                         ),
6498                         leavemm=True,
6499                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6500                         ctx=ctx,
6501                         _ctx_immutable=False,
6502                     )
6503                     if len(defined_tail) > 0:
6504                         raise DecodeError(
6505                             "remaining data",
6506                             klass=self.__class__,
6507                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6508                             offset=offset,
6509                         )
6510                     value.defined = (defined_by, defined_value)
6511
6512             value_len = value.fulllen
6513             vlen += value_len
6514             sub_offset += value_len
6515             v = v_tail
6516             if spec_defaulted:
6517                 if evgen_mode:
6518                     yield sub_decode_path, value, v_tail
6519                 if value == spec.default:
6520                     if ctx_bered or ctx_allow_default_values:
6521                         ber_encoded = True
6522                     else:
6523                         raise DecodeError(
6524                             "DEFAULT value met",
6525                             klass=self.__class__,
6526                             decode_path=sub_decode_path,
6527                             offset=sub_offset,
6528                         )
6529             if not evgen_mode:
6530                 values[name] = value
6531                 spec_defines = getattr(spec, "defines", ())
6532                 if len(spec_defines) == 0:
6533                     defines_by_path = ctx.get("defines_by_path", ())
6534                     if len(defines_by_path) > 0:
6535                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6536                 if spec_defines is not None and len(spec_defines) > 0:
6537                     for rel_path, schema in spec_defines:
6538                         defined = schema.get(value, None)
6539                         if defined is not None:
6540                             ctx.setdefault("_defines", []).append((
6541                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6542                                 (value, defined),
6543                             ))
6544         if lenindef:
6545             if v[:EOC_LEN].tobytes() != EOC:
6546                 raise DecodeError(
6547                     "no EOC",
6548                     klass=self.__class__,
6549                     decode_path=decode_path,
6550                     offset=offset,
6551                 )
6552             tail = v[EOC_LEN:]
6553             vlen += EOC_LEN
6554         elif len(v) > 0:
6555             raise DecodeError(
6556                 "remaining data",
6557                 klass=self.__class__,
6558                 decode_path=decode_path,
6559                 offset=offset,
6560             )
6561         obj = self.__class__(
6562             schema=self.specs,
6563             impl=self.tag,
6564             expl=self._expl,
6565             default=self.default,
6566             optional=self.optional,
6567             _decoded=(offset, llen, vlen),
6568         )
6569         obj._value = values
6570         obj.lenindef = lenindef
6571         obj.ber_encoded = ber_encoded
6572         yield decode_path, obj, tail
6573
6574     def __repr__(self):
6575         value = pp_console_row(next(self.pps()))
6576         cols = []
6577         for name in self.specs:
6578             _value = self._value.get(name)
6579             if _value is None:
6580                 continue
6581             cols.append("%s: %s" % (name, repr(_value)))
6582         return "%s[%s]" % (value, "; ".join(cols))
6583
6584     def pps(self, decode_path=()):
6585         yield _pp(
6586             obj=self,
6587             asn1_type_name=self.asn1_type_name,
6588             obj_name=self.__class__.__name__,
6589             decode_path=decode_path,
6590             optional=self.optional,
6591             default=self == self.default,
6592             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6593             expl=None if self._expl is None else tag_decode(self._expl),
6594             offset=self.offset,
6595             tlen=self.tlen,
6596             llen=self.llen,
6597             vlen=self.vlen,
6598             expl_offset=self.expl_offset if self.expled else None,
6599             expl_tlen=self.expl_tlen if self.expled else None,
6600             expl_llen=self.expl_llen if self.expled else None,
6601             expl_vlen=self.expl_vlen if self.expled else None,
6602             expl_lenindef=self.expl_lenindef,
6603             lenindef=self.lenindef,
6604             ber_encoded=self.ber_encoded,
6605             bered=self.bered,
6606         )
6607         for name in self.specs:
6608             value = self._value.get(name)
6609             if value is None:
6610                 continue
6611             yield value.pps(decode_path=decode_path + (name,))
6612         for pp in self.pps_lenindef(decode_path):
6613             yield pp
6614
6615
6616 class Set(Sequence, SequenceEncode1stMixin):
6617     """``SET`` structure type
6618
6619     Its usage is identical to :py:class:`pyderasn.Sequence`.
6620
6621     .. _allow_unordered_set_ctx:
6622
6623     DER prohibits unordered values encoding and will raise an error
6624     during decode. If :ref:`bered <bered_ctx>` context option is set,
6625     then no error will occur. Also you can disable strict values
6626     ordering check by setting ``"allow_unordered_set": True``
6627     :ref:`context <ctx>` option.
6628     """
6629     __slots__ = ()
6630     tag_default = tag_encode(form=TagFormConstructed, num=17)
6631     asn1_type_name = "SET"
6632
6633     def _values_for_encoding(self):
6634         return sorted(super()._values_for_encoding(), key=attrgetter("tag_order"))
6635
6636     def _encode_cer(self, writer):
6637         write_full(writer, self.tag + LENINDEF)
6638         for v in sorted(
6639                 super()._values_for_encoding(),
6640                 key=attrgetter("tag_order_cer"),
6641         ):
6642             v.encode_cer(writer)
6643         write_full(writer, EOC)
6644
6645     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6646         try:
6647             t, tlen, lv = tag_strip(tlv)
6648         except DecodeError as err:
6649             raise err.__class__(
6650                 msg=err.msg,
6651                 klass=self.__class__,
6652                 decode_path=decode_path,
6653                 offset=offset,
6654             )
6655         if t != self.tag:
6656             raise TagMismatch(
6657                 klass=self.__class__,
6658                 decode_path=decode_path,
6659                 offset=offset,
6660             )
6661         if tag_only:
6662             yield None
6663             return
6664         lenindef = False
6665         ctx_bered = ctx.get("bered", False)
6666         try:
6667             l, llen, v = len_decode(lv)
6668         except LenIndefForm as err:
6669             if not ctx_bered:
6670                 raise err.__class__(
6671                     msg=err.msg,
6672                     klass=self.__class__,
6673                     decode_path=decode_path,
6674                     offset=offset,
6675                 )
6676             l, llen, v = 0, 1, lv[1:]
6677             lenindef = True
6678         except DecodeError as err:
6679             raise err.__class__(
6680                 msg=err.msg,
6681                 klass=self.__class__,
6682                 decode_path=decode_path,
6683                 offset=offset,
6684             )
6685         if l > len(v):
6686             raise NotEnoughData(
6687                 "encoded length is longer than data",
6688                 klass=self.__class__,
6689                 offset=offset,
6690             )
6691         if not lenindef:
6692             v, tail = v[:l], v[l:]
6693         vlen = 0
6694         sub_offset = offset + tlen + llen
6695         values = {}
6696         ber_encoded = False
6697         ctx_allow_default_values = ctx.get("allow_default_values", False)
6698         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6699         tag_order_prev = (0, 0)
6700         _specs_items = copy(self.specs)
6701
6702         while len(v) > 0:
6703             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6704                 break
6705             for name, spec in _specs_items.items():
6706                 sub_decode_path = decode_path + (name,)
6707                 try:
6708                     spec.decode(
6709                         v,
6710                         sub_offset,
6711                         leavemm=True,
6712                         decode_path=sub_decode_path,
6713                         ctx=ctx,
6714                         tag_only=True,
6715                         _ctx_immutable=False,
6716                     )
6717                 except TagMismatch:
6718                     continue
6719                 break
6720             else:
6721                 raise TagMismatch(
6722                     klass=self.__class__,
6723                     decode_path=decode_path,
6724                     offset=offset,
6725                 )
6726             spec_defaulted = spec.default is not None
6727             if evgen_mode and not spec_defaulted:
6728                 for _decode_path, value, v_tail in spec.decode_evgen(
6729                         v,
6730                         sub_offset,
6731                         leavemm=True,
6732                         decode_path=sub_decode_path,
6733                         ctx=ctx,
6734                         _ctx_immutable=False,
6735                 ):
6736                     yield _decode_path, value, v_tail
6737             else:
6738                 _, value, v_tail = next(spec.decode_evgen(
6739                     v,
6740                     sub_offset,
6741                     leavemm=True,
6742                     decode_path=sub_decode_path,
6743                     ctx=ctx,
6744                     _ctx_immutable=False,
6745                     _evgen_mode=False,
6746                 ))
6747             value_tag_order = value.tag_order
6748             value_len = value.fulllen
6749             if tag_order_prev >= value_tag_order:
6750                 if ctx_bered or ctx_allow_unordered_set:
6751                     ber_encoded = True
6752                 else:
6753                     raise DecodeError(
6754                         "unordered " + self.asn1_type_name,
6755                         klass=self.__class__,
6756                         decode_path=sub_decode_path,
6757                         offset=sub_offset,
6758                     )
6759             if spec_defaulted:
6760                 if evgen_mode:
6761                     yield sub_decode_path, value, v_tail
6762                 if value != spec.default:
6763                     pass
6764                 elif ctx_bered or ctx_allow_default_values:
6765                     ber_encoded = True
6766                 else:
6767                     raise DecodeError(
6768                         "DEFAULT value met",
6769                         klass=self.__class__,
6770                         decode_path=sub_decode_path,
6771                         offset=sub_offset,
6772                     )
6773             values[name] = value
6774             del _specs_items[name]
6775             tag_order_prev = value_tag_order
6776             sub_offset += value_len
6777             vlen += value_len
6778             v = v_tail
6779
6780         obj = self.__class__(
6781             schema=self.specs,
6782             impl=self.tag,
6783             expl=self._expl,
6784             default=self.default,
6785             optional=self.optional,
6786             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6787         )
6788         if lenindef:
6789             if v[:EOC_LEN].tobytes() != EOC:
6790                 raise DecodeError(
6791                     "no EOC",
6792                     klass=self.__class__,
6793                     decode_path=decode_path,
6794                     offset=offset,
6795                 )
6796             tail = v[EOC_LEN:]
6797             obj.lenindef = True
6798         for name, spec in self.specs.items():
6799             if name not in values and not spec.optional:
6800                 raise DecodeError(
6801                     "%s value is not ready" % name,
6802                     klass=self.__class__,
6803                     decode_path=decode_path,
6804                     offset=offset,
6805                 )
6806         if not evgen_mode:
6807             obj._value = values
6808         obj.ber_encoded = ber_encoded
6809         yield decode_path, obj, tail
6810
6811
6812 SequenceOfState = namedtuple(
6813     "SequenceOfState",
6814     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6815     **NAMEDTUPLE_KWARGS
6816 )
6817
6818
6819 class SequenceOf(SequenceEncode1stMixin, Obj):
6820     """``SEQUENCE OF`` sequence type
6821
6822     For that kind of type you must specify the object it will carry on
6823     (bounds are for example here, not required)::
6824
6825         class Ints(SequenceOf):
6826             schema = Integer()
6827             bounds = (0, 2)
6828
6829     >>> ints = Ints()
6830     >>> ints.append(Integer(123))
6831     >>> ints.append(Integer(234))
6832     >>> ints
6833     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6834     >>> [int(i) for i in ints]
6835     [123, 234]
6836     >>> ints.append(Integer(345))
6837     Traceback (most recent call last):
6838     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6839     >>> ints[1]
6840     INTEGER 234
6841     >>> ints[1] = Integer(345)
6842     >>> ints
6843     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6844
6845     You can initialize sequence with preinitialized values:
6846
6847     >>> ints = Ints([Integer(123), Integer(234)])
6848
6849     Also you can use iterator as a value:
6850
6851     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6852
6853     And it won't be iterated until encoding process. Pay attention that
6854     bounds and required schema checks are done only during the encoding
6855     process in that case! After encode was called, then value is zeroed
6856     back to empty list and you have to set it again. That mode is useful
6857     mainly with CER encoding mode, where all objects from the iterable
6858     will be streamed to the buffer, without copying all of them to
6859     memory first.
6860     """
6861     __slots__ = ("spec", "_bound_min", "_bound_max")
6862     tag_default = tag_encode(form=TagFormConstructed, num=16)
6863     asn1_type_name = "SEQUENCE OF"
6864
6865     def __init__(
6866             self,
6867             value=None,
6868             schema=None,
6869             bounds=None,
6870             impl=None,
6871             expl=None,
6872             default=None,
6873             optional=False,
6874             _decoded=(0, 0, 0),
6875     ):
6876         super().__init__(impl, expl, default, optional, _decoded)
6877         if schema is None:
6878             schema = getattr(self, "schema", None)
6879         if schema is None:
6880             raise ValueError("schema must be specified")
6881         self.spec = schema
6882         self._bound_min, self._bound_max = getattr(
6883             self,
6884             "bounds",
6885             (0, float("+inf")),
6886         ) if bounds is None else bounds
6887         self._value = []
6888         if value is not None:
6889             self._value = self._value_sanitize(value)
6890         if default is not None:
6891             default_value = self._value_sanitize(default)
6892             default_obj = self.__class__(
6893                 schema=schema,
6894                 impl=self.tag,
6895                 expl=self._expl,
6896             )
6897             default_obj._value = default_value
6898             self.default = default_obj
6899             if value is None:
6900                 self._value = copy(default_obj._value)
6901
6902     def _value_sanitize(self, value):
6903         iterator = False
6904         if issubclass(value.__class__, SequenceOf):
6905             value = value._value
6906         elif hasattr(value, NEXT_ATTR_NAME):
6907             iterator = True
6908         elif hasattr(value, "__iter__"):
6909             value = list(value)
6910         else:
6911             raise InvalidValueType((self.__class__, iter, "iterator"))
6912         if not iterator:
6913             if not self._bound_min <= len(value) <= self._bound_max:
6914                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6915             class_expected = self.spec.__class__
6916             for v in value:
6917                 if not isinstance(v, class_expected):
6918                     raise InvalidValueType((class_expected,))
6919         return value
6920
6921     @property
6922     def ready(self):
6923         if hasattr(self._value, NEXT_ATTR_NAME):
6924             return True
6925         if self._bound_min > 0 and len(self._value) == 0:
6926             return False
6927         return all(v.ready for v in self._value)
6928
6929     @property
6930     def bered(self):
6931         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6932             return True
6933         return any(v.bered for v in self._value)
6934
6935     def __getstate__(self):
6936         if hasattr(self._value, NEXT_ATTR_NAME):
6937             raise ValueError("can not pickle SequenceOf with iterator")
6938         return SequenceOfState(
6939             __version__,
6940             self.tag,
6941             self._tag_order,
6942             self._expl,
6943             self.default,
6944             self.optional,
6945             self.offset,
6946             self.llen,
6947             self.vlen,
6948             self.expl_lenindef,
6949             self.lenindef,
6950             self.ber_encoded,
6951             self.spec,
6952             [copy(v) for v in self._value],
6953             self._bound_min,
6954             self._bound_max,
6955         )
6956
6957     def __setstate__(self, state):
6958         super().__setstate__(state)
6959         self.spec = state.spec
6960         self._value = state.value
6961         self._bound_min = state.bound_min
6962         self._bound_max = state.bound_max
6963
6964     def __eq__(self, their):
6965         if isinstance(their, self.__class__):
6966             return (
6967                 self.spec == their.spec and
6968                 self.tag == their.tag and
6969                 self._expl == their._expl and
6970                 self._value == their._value
6971             )
6972         if hasattr(their, "__iter__"):
6973             return self._value == list(their)
6974         return False
6975
6976     def __call__(
6977             self,
6978             value=None,
6979             bounds=None,
6980             impl=None,
6981             expl=None,
6982             default=None,
6983             optional=None,
6984     ):
6985         return self.__class__(
6986             value=value,
6987             schema=self.spec,
6988             bounds=(
6989                 (self._bound_min, self._bound_max)
6990                 if bounds is None else bounds
6991             ),
6992             impl=self.tag if impl is None else impl,
6993             expl=self._expl if expl is None else expl,
6994             default=self.default if default is None else default,
6995             optional=self.optional if optional is None else optional,
6996         )
6997
6998     def __contains__(self, key):
6999         return key in self._value
7000
7001     def append(self, value):
7002         if not isinstance(value, self.spec.__class__):
7003             raise InvalidValueType((self.spec.__class__,))
7004         if len(self._value) + 1 > self._bound_max:
7005             raise BoundsError(
7006                 self._bound_min,
7007                 len(self._value) + 1,
7008                 self._bound_max,
7009             )
7010         self._value.append(value)
7011
7012     def __iter__(self):
7013         return iter(self._value)
7014
7015     def __len__(self):
7016         return len(self._value)
7017
7018     def __setitem__(self, key, value):
7019         if not isinstance(value, self.spec.__class__):
7020             raise InvalidValueType((self.spec.__class__,))
7021         self._value[key] = self.spec(value=value)
7022
7023     def __getitem__(self, key):
7024         return self._value[key]
7025
7026     def _values_for_encoding(self):
7027         return iter(self._value)
7028
7029     def _encode(self):
7030         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7031         if iterator:
7032             values = []
7033             values_append = values.append
7034             class_expected = self.spec.__class__
7035             values_for_encoding = self._values_for_encoding()
7036             self._value = []
7037             for v in values_for_encoding:
7038                 if not isinstance(v, class_expected):
7039                     raise InvalidValueType((class_expected,))
7040                 values_append(v.encode())
7041             if not self._bound_min <= len(values) <= self._bound_max:
7042                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7043             value = b"".join(values)
7044         else:
7045             value = b"".join(v.encode() for v in self._values_for_encoding())
7046         return b"".join((self.tag, len_encode(len(value)), value))
7047
7048     def _encode1st(self, state):
7049         state = super()._encode1st(state)
7050         if hasattr(self._value, NEXT_ATTR_NAME):
7051             self._value = []
7052         return state
7053
7054     def _encode2nd(self, writer, state_iter):
7055         write_full(writer, self.tag + len_encode(next(state_iter)))
7056         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7057         if iterator:
7058             values_count = 0
7059             class_expected = self.spec.__class__
7060             values_for_encoding = self._values_for_encoding()
7061             self._value = []
7062             for v in values_for_encoding:
7063                 if not isinstance(v, class_expected):
7064                     raise InvalidValueType((class_expected,))
7065                 v.encode2nd(writer, state_iter)
7066                 values_count += 1
7067             if not self._bound_min <= values_count <= self._bound_max:
7068                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7069         else:
7070             for v in self._values_for_encoding():
7071                 v.encode2nd(writer, state_iter)
7072
7073     def _encode_cer(self, writer):
7074         write_full(writer, self.tag + LENINDEF)
7075         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7076         if iterator:
7077             class_expected = self.spec.__class__
7078             values_count = 0
7079             values_for_encoding = self._values_for_encoding()
7080             self._value = []
7081             for v in values_for_encoding:
7082                 if not isinstance(v, class_expected):
7083                     raise InvalidValueType((class_expected,))
7084                 v.encode_cer(writer)
7085                 values_count += 1
7086             if not self._bound_min <= values_count <= self._bound_max:
7087                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7088         else:
7089             for v in self._values_for_encoding():
7090                 v.encode_cer(writer)
7091         write_full(writer, EOC)
7092
7093     def _decode(
7094             self,
7095             tlv,
7096             offset,
7097             decode_path,
7098             ctx,
7099             tag_only,
7100             evgen_mode,
7101             ordering_check=False,
7102     ):
7103         try:
7104             t, tlen, lv = tag_strip(tlv)
7105         except DecodeError as err:
7106             raise err.__class__(
7107                 msg=err.msg,
7108                 klass=self.__class__,
7109                 decode_path=decode_path,
7110                 offset=offset,
7111             )
7112         if t != self.tag:
7113             raise TagMismatch(
7114                 klass=self.__class__,
7115                 decode_path=decode_path,
7116                 offset=offset,
7117             )
7118         if tag_only:
7119             yield None
7120             return
7121         lenindef = False
7122         ctx_bered = ctx.get("bered", False)
7123         try:
7124             l, llen, v = len_decode(lv)
7125         except LenIndefForm as err:
7126             if not ctx_bered:
7127                 raise err.__class__(
7128                     msg=err.msg,
7129                     klass=self.__class__,
7130                     decode_path=decode_path,
7131                     offset=offset,
7132                 )
7133             l, llen, v = 0, 1, lv[1:]
7134             lenindef = True
7135         except DecodeError as err:
7136             raise err.__class__(
7137                 msg=err.msg,
7138                 klass=self.__class__,
7139                 decode_path=decode_path,
7140                 offset=offset,
7141             )
7142         if l > len(v):
7143             raise NotEnoughData(
7144                 "encoded length is longer than data",
7145                 klass=self.__class__,
7146                 decode_path=decode_path,
7147                 offset=offset,
7148             )
7149         if not lenindef:
7150             v, tail = v[:l], v[l:]
7151         vlen = 0
7152         sub_offset = offset + tlen + llen
7153         _value = []
7154         _value_count = 0
7155         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7156         value_prev = memoryview(v[:0])
7157         ber_encoded = False
7158         spec = self.spec
7159         while len(v) > 0:
7160             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7161                 break
7162             sub_decode_path = decode_path + (str(_value_count),)
7163             if evgen_mode:
7164                 for _decode_path, value, v_tail in spec.decode_evgen(
7165                         v,
7166                         sub_offset,
7167                         leavemm=True,
7168                         decode_path=sub_decode_path,
7169                         ctx=ctx,
7170                         _ctx_immutable=False,
7171                 ):
7172                     yield _decode_path, value, v_tail
7173             else:
7174                 _, value, v_tail = next(spec.decode_evgen(
7175                     v,
7176                     sub_offset,
7177                     leavemm=True,
7178                     decode_path=sub_decode_path,
7179                     ctx=ctx,
7180                     _ctx_immutable=False,
7181                     _evgen_mode=False,
7182                 ))
7183             value_len = value.fulllen
7184             if ordering_check:
7185                 if value_prev.tobytes() > v[:value_len].tobytes():
7186                     if ctx_bered or ctx_allow_unordered_set:
7187                         ber_encoded = True
7188                     else:
7189                         raise DecodeError(
7190                             "unordered " + self.asn1_type_name,
7191                             klass=self.__class__,
7192                             decode_path=sub_decode_path,
7193                             offset=sub_offset,
7194                         )
7195                 value_prev = v[:value_len]
7196             _value_count += 1
7197             if not evgen_mode:
7198                 _value.append(value)
7199             sub_offset += value_len
7200             vlen += value_len
7201             v = v_tail
7202         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7203             raise DecodeError(
7204                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7205                 klass=self.__class__,
7206                 decode_path=decode_path,
7207                 offset=offset,
7208             )
7209         try:
7210             obj = self.__class__(
7211                 value=None if evgen_mode else _value,
7212                 schema=spec,
7213                 bounds=(self._bound_min, self._bound_max),
7214                 impl=self.tag,
7215                 expl=self._expl,
7216                 default=self.default,
7217                 optional=self.optional,
7218                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7219             )
7220         except BoundsError as err:
7221             raise DecodeError(
7222                 msg=str(err),
7223                 klass=self.__class__,
7224                 decode_path=decode_path,
7225                 offset=offset,
7226             )
7227         if lenindef:
7228             if v[:EOC_LEN].tobytes() != EOC:
7229                 raise DecodeError(
7230                     "no EOC",
7231                     klass=self.__class__,
7232                     decode_path=decode_path,
7233                     offset=offset,
7234                 )
7235             obj.lenindef = True
7236             tail = v[EOC_LEN:]
7237         obj.ber_encoded = ber_encoded
7238         yield decode_path, obj, tail
7239
7240     def __repr__(self):
7241         return "%s[%s]" % (
7242             pp_console_row(next(self.pps())),
7243             ", ".join(repr(v) for v in self._value),
7244         )
7245
7246     def pps(self, decode_path=()):
7247         yield _pp(
7248             obj=self,
7249             asn1_type_name=self.asn1_type_name,
7250             obj_name=self.__class__.__name__,
7251             decode_path=decode_path,
7252             optional=self.optional,
7253             default=self == self.default,
7254             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7255             expl=None if self._expl is None else tag_decode(self._expl),
7256             offset=self.offset,
7257             tlen=self.tlen,
7258             llen=self.llen,
7259             vlen=self.vlen,
7260             expl_offset=self.expl_offset if self.expled else None,
7261             expl_tlen=self.expl_tlen if self.expled else None,
7262             expl_llen=self.expl_llen if self.expled else None,
7263             expl_vlen=self.expl_vlen if self.expled else None,
7264             expl_lenindef=self.expl_lenindef,
7265             lenindef=self.lenindef,
7266             ber_encoded=self.ber_encoded,
7267             bered=self.bered,
7268         )
7269         for i, value in enumerate(self._value):
7270             yield value.pps(decode_path=decode_path + (str(i),))
7271         for pp in self.pps_lenindef(decode_path):
7272             yield pp
7273
7274
7275 class SetOf(SequenceOf):
7276     """``SET OF`` sequence type
7277
7278     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7279     """
7280     __slots__ = ()
7281     tag_default = tag_encode(form=TagFormConstructed, num=17)
7282     asn1_type_name = "SET OF"
7283
7284     def _value_sanitize(self, value):
7285         value = super()._value_sanitize(value)
7286         if hasattr(value, NEXT_ATTR_NAME):
7287             raise ValueError(
7288                 "SetOf does not support iterator values, as no sense in them"
7289             )
7290         return value
7291
7292     def _encode(self):
7293         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7294         return b"".join((self.tag, len_encode(len(v)), v))
7295
7296     def _encode2nd(self, writer, state_iter):
7297         write_full(writer, self.tag + len_encode(next(state_iter)))
7298         values = []
7299         for v in self._values_for_encoding():
7300             buf = BytesIO()
7301             v.encode2nd(buf.write, state_iter)
7302             values.append(buf.getvalue())
7303         values.sort()
7304         for v in values:
7305             write_full(writer, v)
7306
7307     def _encode_cer(self, writer):
7308         write_full(writer, self.tag + LENINDEF)
7309         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7310             write_full(writer, v)
7311         write_full(writer, EOC)
7312
7313     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7314         return super()._decode(
7315             tlv,
7316             offset,
7317             decode_path,
7318             ctx,
7319             tag_only,
7320             evgen_mode,
7321             ordering_check=True,
7322         )
7323
7324
7325 def obj_by_path(pypath):  # pragma: no cover
7326     """Import object specified as string Python path
7327
7328     Modules must be separated from classes/functions with ``:``.
7329
7330     >>> obj_by_path("foo.bar:Baz")
7331     <class 'foo.bar.Baz'>
7332     >>> obj_by_path("foo.bar:Baz.boo")
7333     <classmethod 'foo.bar.Baz.boo'>
7334     """
7335     mod, objs = pypath.rsplit(":", 1)
7336     from importlib import import_module
7337     obj = import_module(mod)
7338     for obj_name in objs.split("."):
7339         obj = getattr(obj, obj_name)
7340     return obj
7341
7342
7343 def generic_decoder():  # pragma: no cover
7344     # All of this below is a big hack with self references
7345     choice = PrimitiveTypes()
7346     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7347     choice.specs["SetOf"] = SetOf(schema=choice)
7348     for i in range(31):
7349         choice.specs["SequenceOf%d" % i] = SequenceOf(
7350             schema=choice,
7351             expl=tag_ctxc(i),
7352         )
7353     choice.specs["Any"] = Any()
7354
7355     # Class name equals to type name, to omit it from output
7356     class SEQUENCEOF(SequenceOf):
7357         __slots__ = ()
7358         schema = choice
7359
7360     def pprint_any(
7361             obj,
7362             oid_maps=(),
7363             with_colours=False,
7364             with_decode_path=False,
7365             decode_path_only=(),
7366             decode_path=(),
7367     ):
7368         def _pprint_pps(pps):
7369             for pp in pps:
7370                 if hasattr(pp, "_fields"):
7371                     if (
7372                             decode_path_only != () and
7373                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7374                     ):
7375                         continue
7376                     if pp.asn1_type_name == Choice.asn1_type_name:
7377                         continue
7378                     pp_kwargs = pp._asdict()
7379                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7380                     pp = _pp(**pp_kwargs)
7381                     yield pp_console_row(
7382                         pp,
7383                         oid_maps=oid_maps,
7384                         with_offsets=True,
7385                         with_blob=False,
7386                         with_colours=with_colours,
7387                         with_decode_path=with_decode_path,
7388                         decode_path_len_decrease=len(decode_path_only),
7389                     )
7390                     for row in pp_console_blob(
7391                             pp,
7392                             decode_path_len_decrease=len(decode_path_only),
7393                     ):
7394                         yield row
7395                 else:
7396                     for row in _pprint_pps(pp):
7397                         yield row
7398         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7399     return SEQUENCEOF(), pprint_any
7400
7401
7402 def ascii_visualize(ba):
7403     """Output only ASCII printable characters, like in hexdump -C
7404
7405     Example output for given binary string (right part)::
7406
7407         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7408                                                            ^^^^^^^^^^^^^^^^
7409     """
7410     return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7411
7412
7413 def hexdump(raw):
7414     """Generate ``hexdump -C`` like output
7415
7416     Rendered example::
7417
7418         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7419         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7420
7421     Result of that function is a generator of lines, where each line is
7422     a list of columns::
7423
7424         [
7425             [...],
7426             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7427                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7428                           " |i..?.....G...k.j|"]
7429             [...],
7430         ]
7431     """
7432     hexed = hexenc(raw).upper()
7433     addr, cols = 0, ["%08x " % 0]
7434     for i in range(0, len(hexed), 2):
7435         if i != 0 and i // 2 % 8 == 0:
7436             cols[-1] += " "
7437         if i != 0 and i // 2 % 16 == 0:
7438             cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:addr + 16])))
7439             yield cols
7440             addr += 16
7441             cols = ["%08x " % addr]
7442         cols.append(" " + hexed[i:i + 2])
7443     if len(cols) > 0:
7444         cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:])))
7445         yield cols
7446
7447
7448 def browse(raw, obj, oid_maps=()):
7449     """Interactive browser
7450
7451     :param bytes raw: binary data you decoded
7452     :param obj: decoded :py:class:`pyderasn.Obj`
7453     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7454                      Its human readable form is printed when OID is met
7455
7456     .. note:: `urwid <http://urwid.org/>`__ dependency required
7457
7458     This browser is an interactive terminal application for browsing
7459     structures of your decoded ASN.1 objects. You can quit it with **q**
7460     key. It consists of three windows:
7461
7462     :tree:
7463      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7464      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7465      **Left** key goes to constructed element above. **Plus**/**Minus**
7466      keys collapse/uncollapse constructed elements. **Space** toggles it
7467     :info:
7468      window with various information about element. You can scroll it
7469      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7470     :hexdump:
7471      window with raw data hexdump and highlighted current element's
7472      contents. It automatically focuses on element's data. You can
7473      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7474      speed) keys. If element has explicit tag, then it also will be
7475      highlighted with different colour
7476
7477     Window's header contains current decode path and progress bars with
7478     position in *info* and *hexdump* windows.
7479
7480     If you press **d**, then current element will be saved in the
7481     current directory under its decode path name (adding ".0", ".1", etc
7482     suffix if such file already exists). **D** will save it with explicit tag.
7483
7484     You can also invoke it with ``--browse`` command line argument.
7485     """
7486     from copy import deepcopy
7487     from os.path import exists as path_exists
7488     import urwid
7489
7490     class TW(urwid.TreeWidget):
7491         def __init__(self, state, *args, **kwargs):
7492             self.state = state
7493             self.scrolled = {"info": False, "hexdump": False}
7494             super().__init__(*args, **kwargs)
7495
7496         def _get_pp(self):
7497             pp = self.get_node().get_value()
7498             constructed = len(pp) > 1
7499             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7500
7501         def _state_update(self):
7502             pp, _ = self._get_pp()
7503             self.state["decode_path"].set_text(
7504                 ":".join(str(p) for p in pp.decode_path)
7505             )
7506             lines = deepcopy(self.state["hexed"])
7507
7508             def attr_set(i, attr):
7509                 line = lines[i // 16]
7510                 idx = 1 + (i - 16 * (i // 16))
7511                 line[idx] = (attr, line[idx])
7512
7513             if pp.expl_offset is not None:
7514                 for i in range(
7515                         pp.expl_offset,
7516                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7517                 ):
7518                     attr_set(i, "select-expl")
7519             for i in range(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7520                 attr_set(i, "select-value")
7521             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7522             self.state["hexdump"].set_focus(pp.offset // 16)
7523             self.state["hexdump"].set_focus_valign("middle")
7524             self.state["hexdump_bar"].set_completion(
7525                 (100 * pp.offset // 16) //
7526                 len(self.state["hexdump"]._body.positions())
7527             )
7528
7529             lines = [
7530                 [("header", "Name: "), pp.obj_name],
7531                 [("header", "Type: "), pp.asn1_type_name],
7532                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7533                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7534                     pp.tlen, pp.llen, pp.vlen,
7535                 )],
7536                 [("header", "TLVlen: "), "%d" % sum((
7537                     pp.tlen, pp.llen, pp.vlen,
7538                 ))],
7539                 [("header", "Slice: "), "[%d:%d]" % (
7540                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7541                 )],
7542             ]
7543             if pp.lenindef:
7544                 lines.append([("warning", "LENINDEF")])
7545             if pp.ber_encoded:
7546                 lines.append([("warning", "BER encoded")])
7547             if pp.bered:
7548                 lines.append([("warning", "BERed")])
7549             if pp.expl is not None:
7550                 lines.append([("header", "EXPLICIT")])
7551                 klass, _, num = pp.expl
7552                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7553                 if pp.expl_offset is not None:
7554                     lines.append(["  Offset: %d" % pp.expl_offset])
7555                     lines.append(["  [TLV]len: %d/%d/%d" % (
7556                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7557                     )])
7558                     lines.append(["  TLVlen: %d" % sum((
7559                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7560                     ))])
7561                     lines.append(["  Slice: [%d:%d]" % (
7562                         pp.expl_offset,
7563                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7564                     )])
7565             if pp.impl is not None:
7566                 klass, _, num = pp.impl
7567                 lines.append([
7568                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7569                 ])
7570             if pp.optional:
7571                 lines.append(["OPTIONAL"])
7572             if pp.default:
7573                 lines.append(["DEFAULT"])
7574             if len(pp.decode_path) > 0:
7575                 ent = pp.decode_path[-1]
7576                 if isinstance(ent, DecodePathDefBy):
7577                     lines.append([""])
7578                     value = str(ent.defined_by)
7579                     oid_name = find_oid_name(
7580                         ent.defined_by.asn1_type_name, oid_maps, value,
7581                     )
7582                     lines.append([("header", "DEFINED BY: "), "%s" % (
7583                         value if oid_name is None
7584                         else "%s (%s)" % (oid_name, value)
7585                     )])
7586             lines.append([""])
7587             if pp.value is not None:
7588                 lines.append([("header", "Value: "), pp.value])
7589                 if (
7590                         len(oid_maps) > 0 and
7591                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7592                 ):
7593                     for oid_map in oid_maps:
7594                         oid_name = oid_map.get(pp.value)
7595                         if oid_name is not None:
7596                             lines.append([("header", "Human: "), oid_name])
7597                             break
7598                 if pp.asn1_type_name == Integer.asn1_type_name:
7599                     lines.append([
7600                         ("header", "Decimal: "), "%d" % int(pp.obj),
7601                     ])
7602                     lines.append([
7603                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7604                     ])
7605             if pp.blob.__class__ == bytes:
7606                 blob = hexenc(pp.blob).upper()
7607                 for i in range(0, len(blob), 32):
7608                     lines.append([colonize_hex(blob[i:i + 32])])
7609             elif pp.blob.__class__ == tuple:
7610                 lines.append([", ".join(pp.blob)])
7611             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7612             self.state["info_bar"].set_completion(0)
7613
7614         def selectable(self):
7615             if self.state["widget_current"] != self:
7616                 self.state["widget_current"] = self
7617                 self.scrolled["info"] = False
7618                 self.scrolled["hexdump"] = False
7619                 self._state_update()
7620             return super().selectable()
7621
7622         def _get_display_text_without_offset(self):
7623             pp, constructed = self._get_pp()
7624             style = "constructed" if constructed else ""
7625             if len(pp.decode_path) == 0:
7626                 return (style, pp.obj_name)
7627             if pp.asn1_type_name == "EOC":
7628                 return ("eoc", "EOC")
7629             ent = pp.decode_path[-1]
7630             if isinstance(ent, DecodePathDefBy):
7631                 value = str(ent.defined_by)
7632                 oid_name = find_oid_name(
7633                     ent.defined_by.asn1_type_name, oid_maps, value,
7634                 )
7635                 return ("defby", "DEFBY:" + (
7636                     value if oid_name is None else oid_name
7637                 ))
7638             return (style, ent)
7639
7640         def get_display_text(self):
7641             pp, _ = self._get_pp()
7642             style, ent = self._get_display_text_without_offset()
7643             return [(style, ent), " [%d]" % pp.offset]
7644
7645         def _scroll(self, what, step):
7646             self.state[what]._invalidate()
7647             pos = self.state[what].focus_position
7648             if not self.scrolled[what]:
7649                 self.scrolled[what] = True
7650                 pos -= 2
7651             pos = max(0, pos + step)
7652             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7653             self.state[what].set_focus(pos)
7654             self.state[what].set_focus_valign("top")
7655             self.state[what + "_bar"].set_completion(
7656                 (100 * pos) // len(self.state[what]._body.positions())
7657             )
7658
7659         def keypress(self, size, key):
7660             if key == "q":
7661                 raise urwid.ExitMainLoop()
7662
7663             if key == " ":
7664                 self.expanded = not self.expanded
7665                 self.update_expanded_icon()
7666                 return None
7667
7668             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7669             if key in hexdump_steps:
7670                 self._scroll("hexdump", hexdump_steps[key])
7671                 return None
7672
7673             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7674             if key in info_steps:
7675                 self._scroll("info", info_steps[key])
7676                 return None
7677
7678             if key in ("d", "D"):
7679                 pp, _ = self._get_pp()
7680                 dp = ":".join(str(p) for p in pp.decode_path)
7681                 dp = dp.replace(" ", "_")
7682                 if dp == "":
7683                     dp = "root"
7684                 if key == "d" or pp.expl_offset is None:
7685                     data = self.state["raw"][pp.offset:(
7686                         pp.offset + pp.tlen + pp.llen + pp.vlen
7687                     )]
7688                 else:
7689                     data = self.state["raw"][pp.expl_offset:(
7690                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7691                     )]
7692                 ctr = 0
7693
7694                 def duplicate_path(dp, ctr):
7695                     if ctr == 0:
7696                         return dp
7697                     return "%s.%d" % (dp, ctr)
7698
7699                 while True:
7700                     if not path_exists(duplicate_path(dp, ctr)):
7701                         break
7702                     ctr += 1
7703                 dp = duplicate_path(dp, ctr)
7704                 with open(dp, "wb") as fd:
7705                     fd.write(data)
7706                 self.state["decode_path"].set_text(
7707                     ("warning", "Saved to: " + dp)
7708                 )
7709                 return None
7710             return super().keypress(size, key)
7711
7712     class PN(urwid.ParentNode):
7713         def __init__(self, state, value, *args, **kwargs):
7714             self.state = state
7715             if not hasattr(value, "_fields"):
7716                 value = list(value)
7717             super().__init__(value, *args, **kwargs)
7718
7719         def load_widget(self):
7720             return TW(self.state, self)
7721
7722         def load_child_keys(self):
7723             value = self.get_value()
7724             if hasattr(value, "_fields"):
7725                 return []
7726             return range(len(value[1:]))
7727
7728         def load_child_node(self, key):
7729             return PN(
7730                 self.state,
7731                 self.get_value()[key + 1],
7732                 parent=self,
7733                 key=key,
7734                 depth=self.get_depth() + 1,
7735             )
7736
7737     class LabeledPG(urwid.ProgressBar):
7738         def __init__(self, label, *args, **kwargs):
7739             self.label = label
7740             super().__init__(*args, **kwargs)
7741
7742         def get_text(self):
7743             return "%s: %s" % (self.label, super().get_text())
7744
7745     WinHexdump = urwid.ListBox([urwid.Text("")])
7746     WinInfo = urwid.ListBox([urwid.Text("")])
7747     WinDecodePath = urwid.Text("", "center")
7748     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7749     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7750     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7751         {
7752             "raw": raw,
7753             "hexed": list(hexdump(raw)),
7754             "widget_current": None,
7755             "info": WinInfo,
7756             "info_bar": WinInfoBar,
7757             "hexdump": WinHexdump,
7758             "hexdump_bar": WinHexdumpBar,
7759             "decode_path": WinDecodePath,
7760         },
7761         list(obj.pps()),
7762     )))
7763     help_text = " ".join((
7764         "q:quit",
7765         "space:(un)collapse",
7766         "(pg)up/down/home/end:nav",
7767         "jkJK:hexdump hlHL:info",
7768         "dD:dump",
7769     ))
7770     urwid.MainLoop(
7771         urwid.Frame(
7772             urwid.Columns([
7773                 ("weight", 1, WinTree),
7774                 ("weight", 2, urwid.Pile([
7775                     urwid.LineBox(WinInfo),
7776                     urwid.LineBox(WinHexdump),
7777                 ])),
7778             ]),
7779             header=urwid.Columns([
7780                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7781                 ("weight", 1, WinInfoBar),
7782                 ("weight", 1, WinHexdumpBar),
7783             ]),
7784             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7785         ),
7786         [
7787             ("header", "bold", ""),
7788             ("constructed", "bold", ""),
7789             ("help", "light magenta", ""),
7790             ("warning", "light red", ""),
7791             ("defby", "light red", ""),
7792             ("eoc", "dark red", ""),
7793             ("select-value", "light green", ""),
7794             ("select-expl", "light red", ""),
7795             ("pg-normal", "", "light blue"),
7796             ("pg-complete", "black", "yellow"),
7797         ],
7798     ).run()
7799
7800
7801 def main():  # pragma: no cover
7802     import argparse
7803     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7804     parser.add_argument(
7805         "--skip",
7806         type=int,
7807         default=0,
7808         help="Skip that number of bytes from the beginning",
7809     )
7810     parser.add_argument(
7811         "--oids",
7812         help="Python paths to dictionary with OIDs, comma separated",
7813     )
7814     parser.add_argument(
7815         "--schema",
7816         help="Python path to schema definition to use",
7817     )
7818     parser.add_argument(
7819         "--defines-by-path",
7820         help="Python path to decoder's defines_by_path",
7821     )
7822     parser.add_argument(
7823         "--nobered",
7824         action="store_true",
7825         help="Disallow BER encoding",
7826     )
7827     parser.add_argument(
7828         "--print-decode-path",
7829         action="store_true",
7830         help="Print decode paths",
7831     )
7832     parser.add_argument(
7833         "--decode-path-only",
7834         help="Print only specified decode path",
7835     )
7836     parser.add_argument(
7837         "--allow-expl-oob",
7838         action="store_true",
7839         help="Allow explicit tag out-of-bound",
7840     )
7841     parser.add_argument(
7842         "--evgen",
7843         action="store_true",
7844         help="Turn on event generation mode",
7845     )
7846     parser.add_argument(
7847         "--browse",
7848         action="store_true",
7849         help="Start ASN.1 browser",
7850     )
7851     parser.add_argument(
7852         "RAWFile",
7853         type=argparse.FileType("rb"),
7854         help="Path to BER/CER/DER file you want to decode",
7855     )
7856     args = parser.parse_args()
7857     try:
7858         raw = file_mmaped(args.RAWFile)[args.skip:]
7859     except:
7860         args.RAWFile.seek(args.skip)
7861         raw = memoryview(args.RAWFile.read())
7862         args.RAWFile.close()
7863     oid_maps = (
7864         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7865         if args.oids else ()
7866     )
7867     from functools import partial
7868     if args.schema:
7869         schema = obj_by_path(args.schema)
7870         pprinter = partial(pprint, big_blobs=True)
7871     else:
7872         schema, pprinter = generic_decoder()
7873     ctx = {
7874         "bered": not args.nobered,
7875         "allow_expl_oob": args.allow_expl_oob,
7876     }
7877     if args.defines_by_path is not None:
7878         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7879     if args.browse:
7880         obj, _ = schema().decode(raw, ctx=ctx)
7881         browse(raw, obj, oid_maps)
7882         from sys import exit as sys_exit
7883         sys_exit(0)
7884     from os import environ
7885     pprinter = partial(
7886         pprinter,
7887         oid_maps=oid_maps,
7888         with_colours=environ.get("NO_COLOR") is None,
7889         with_decode_path=args.print_decode_path,
7890         decode_path_only=(
7891             () if args.decode_path_only is None else
7892             tuple(args.decode_path_only.split(":"))
7893         ),
7894     )
7895     if args.evgen:
7896         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7897             print(pprinter(obj, decode_path=decode_path))
7898     else:
7899         obj, tail = schema().decode(raw, ctx=ctx)
7900         print(pprinter(obj))
7901     if tail != b"":
7902         print("\nTrailing data: %s" % hexenc(tail))
7903
7904
7905 if __name__ == "__main__":
7906     from pyderasn import *
7907     main()