]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
82021efab0391507dfda7e3d53402d71850cc037
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2021 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/CER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER/CER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 Also it could be helpful to add quick ASN.1 pprinting command in your
359 pdb's configuration file::
360
361     alias pp1 import pyderasn ;; print(pyderasn.pprint(%1, oid_maps=(locals().get("OID_STR_TO_NAME", {}),)))
362
363 .. _definedby:
364
365 DEFINED BY
366 ----------
367
368 ASN.1 structures often have ANY and OCTET STRING fields, that are
369 DEFINED BY some previously met ObjectIdentifier. This library provides
370 ability to specify mapping between some OID and field that must be
371 decoded with specific specification.
372
373 .. _defines:
374
375 defines kwarg
376 _____________
377
378 :py:class:`pyderasn.ObjectIdentifier` field inside
379 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
380 necessary for decoding structures. For example, CMS (:rfc:`5652`)
381 container::
382
383     class ContentInfo(Sequence):
384         schema = (
385             ("contentType", ContentType(defines=((("content",), {
386                 id_digestedData: DigestedData(),
387                 id_signedData: SignedData(),
388             }),))),
389             ("content", Any(expl=tag_ctxc(0))),
390         )
391
392 ``contentType`` field tells that it defines that ``content`` must be
393 decoded with ``SignedData`` specification, if ``contentType`` equals to
394 ``id-signedData``. The same applies to ``DigestedData``. If
395 ``contentType`` contains unknown OID, then no automatic decoding is
396 done.
397
398 You can specify multiple fields, that will be autodecoded -- that is why
399 ``defines`` kwarg is a sequence. You can specify defined field
400 relatively or absolutely to current decode path. For example ``defines``
401 for AlgorithmIdentifier of X.509's
402 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
403
404         (
405             (("parameters",), {
406                 id_ecPublicKey: ECParameters(),
407                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
408             }),
409             (("..", "subjectPublicKey"), {
410                 id_rsaEncryption: RSAPublicKey(),
411                 id_GostR3410_2001: OctetString(),
412             }),
413         ),
414
415 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
416 autodecode its parameters inside SPKI's algorithm and its public key
417 itself.
418
419 Following types can be automatically decoded (DEFINED BY):
420
421 * :py:class:`pyderasn.Any`
422 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
423 * :py:class:`pyderasn.OctetString`
424 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
425   ``Any``/``BitString``/``OctetString``-s
426
427 When any of those fields is automatically decoded, then ``.defined``
428 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
429 was defined, ``value`` contains corresponding decoded value. For example
430 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
431
432 .. _defines_by_path_ctx:
433
434 defines_by_path context option
435 ______________________________
436
437 Sometimes you either can not or do not want to explicitly set *defines*
438 in the schema. You can dynamically apply those definitions when calling
439 :py:meth:`pyderasn.Obj.decode` method.
440
441 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
442 value must be sequence of following tuples::
443
444     (decode_path, defines)
445
446 where ``decode_path`` is a tuple holding so-called decode path to the
447 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
448 ``defines``, holding exactly the same value as accepted in its
449 :ref:`keyword argument <defines>`.
450
451 For example, again for CMS, you want to automatically decode
452 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
453 structures it may hold. Also, automatically decode ``controlSequence``
454 of ``PKIResponse``::
455
456     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
457         (
458             ("contentType",),
459             ((("content",), {id_signedData: SignedData()}),),
460         ),
461         (
462             (
463                 "content",
464                 DecodePathDefBy(id_signedData),
465                 "encapContentInfo",
466                 "eContentType",
467             ),
468             ((("eContent",), {
469                 id_cct_PKIData: PKIData(),
470                 id_cct_PKIResponse: PKIResponse(),
471             })),
472         ),
473         (
474             (
475                 "content",
476                 DecodePathDefBy(id_signedData),
477                 "encapContentInfo",
478                 "eContent",
479                 DecodePathDefBy(id_cct_PKIResponse),
480                 "controlSequence",
481                 any,
482                 "attrType",
483             ),
484             ((("attrValues",), {
485                 id_cmc_recipientNonce: RecipientNonce(),
486                 id_cmc_senderNonce: SenderNonce(),
487                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
488                 id_cmc_transactionId: TransactionId(),
489             })),
490         ),
491     )})
492
493 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
494 First function is useful for path construction when some automatic
495 decoding is already done. ``any`` means literally any value it meet --
496 useful for SEQUENCE/SET OF-s.
497
498 .. _bered_ctx:
499
500 BER encoding
501 ------------
502
503 By default PyDERASN accepts only DER encoded data. By default it encodes
504 to DER. But you can optionally enable BER decoding with setting
505 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
506 constructed primitive types should be parsed successfully.
507
508 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
509   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
510   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
511   ``UTCTime``, ``GeneralizedTime`` can contain it.
512 * If object has an indefinite length encoding, then its ``lenindef``
513   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
514   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
515   contain it.
516 * If object has an indefinite length encoded explicit tag, then
517   ``expl_lenindef`` is set to True.
518 * If object has either any of BER-related encoding (explicit tag
519   indefinite length, object's indefinite length, BER-encoding) or any
520   underlying component has that kind of encoding, then ``bered``
521   attribute is set to True. For example SignedData CMS can have
522   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
523   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
524
525 EOC (end-of-contents) token's length is taken in advance in object's
526 value length.
527
528 .. _allow_expl_oob_ctx:
529
530 Allow explicit tag out-of-bound
531 -------------------------------
532
533 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
534 one value, more than one object. If you set ``allow_expl_oob`` context
535 option to True, then no error will be raised and that invalid encoding
536 will be silently further processed. But pay attention that offsets and
537 lengths will be invalid in that case.
538
539 .. warning::
540
541    This option should be used only for skipping some decode errors, just
542    to see the decoded structure somehow.
543
544 .. _streaming:
545
546 Streaming and dealing with huge structures
547 ------------------------------------------
548
549 .. _evgen_mode:
550
551 evgen mode
552 __________
553
554 ASN.1 structures can be huge, they can hold millions of objects inside
555 (for example Certificate Revocation Lists (CRL), holding revocation
556 state for every previously issued X.509 certificate). CACert.org's 8 MiB
557 CRL file takes more than half a gigabyte of memory to hold the decoded
558 structure.
559
560 If you just simply want to check the signature over the ``tbsCertList``,
561 you can create specialized schema with that field represented as
562 OctetString for example::
563
564     class TBSCertListFast(Sequence):
565         schema = (
566             [...]
567             ("revokedCertificates", OctetString(
568                 impl=SequenceOf.tag_default,
569                 optional=True,
570             )),
571             [...]
572         )
573
574 This allows you to quickly decode a few fields and check the signature
575 over the ``tbsCertList`` bytes.
576
577 But how can you get all certificate's serial number from it, after you
578 trust that CRL after signature validation? You can use so called
579 ``evgen`` (event generation) mode, to catch the events/facts of some
580 successful object decoding. Let's use command line capabilities::
581
582     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
583          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
584          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
585          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
586          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
587          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
588          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
589          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
590          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
591     [...]
592         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
593         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
594         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
596         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
597         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
598         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
599         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
600         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
601         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
602     [...]
603     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
604     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
605     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
606       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
607         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
608     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
609     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
610     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
611     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
612         0     [1,4,9145534]  CertificateList SEQUENCE
613
614 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
615 decodes underlying values. It can not tell if SEQUENCE is decoded, so
616 the event of the upper level SEQUENCE is the last one we see.
617 ``version`` field is just a single INTEGER -- it is decoded and event is
618 fired immediately. Then we see that ``algorithm`` and ``parameters``
619 fields are decoded and only after them the ``signature`` SEQUENCE is
620 fired as a successfully decoded. There are 4 events for each revoked
621 certificate entry in that CRL: ``userCertificate`` serial number,
622 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
623 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
624
625 We can do that in our ordinary Python code and understand where we are
626 by looking at deterministically generated decode paths (do not forget
627 about useful ``--print-decode-path`` CLI option). We must use
628 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
629 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
630 obj, tail)`` tuples::
631
632     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
633         if (
634             len(decode_path) == 4 and
635             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
636             decode_path[3] == "userCertificate"
637         ):
638             print("serial number:", int(obj))
639
640 Virtually it does not take any memory except at least needed for single
641 object storage. You can easily use that mode to determine required
642 object ``.offset`` and ``.*len`` to be able to decode it separately, or
643 maybe verify signature upon it just by taking bytes by ``.offset`` and
644 ``.tlvlen``.
645
646 .. _evgen_mode_upto_ctx:
647
648 evgen_mode_upto
649 _______________
650
651 There is full ability to get any kind of data from the CRL in the
652 example above. However it is not too convenient to get the whole
653 ``RevokedCertificate`` structure, that is pretty lightweight and one may
654 do not want to disassemble it. You can use ``evgen_mode_upto``
655 :ref:`ctx <ctx>` option that semantically equals to
656 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
657 mapped to any non-None value. If specified decode path is met, then any
658 subsequent objects won't be decoded in evgen mode. That allows us to
659 parse the CRL above with fully assembled ``RevokedCertificate``::
660
661     for decode_path, obj, _ in CertificateList().decode_evgen(
662         crl_raw,
663         ctx={"evgen_mode_upto": (
664             (("tbsCertList", "revokedCertificates", any), True),
665         )},
666     ):
667         if (
668             len(decode_path) == 3 and
669             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
670         ):
671             print("serial number:", int(obj["userCertificate"]))
672
673 .. note::
674
675    SEQUENCE/SET values with DEFAULT specified are automatically decoded
676    without evgen mode.
677
678 .. _mmap:
679
680 mmap-ed file
681 ____________
682
683 POSIX compliant systems have ``mmap`` syscall, giving ability to work
684 the memory mapped file. You can deal with the file like it was an
685 ordinary binary string, allowing you not to load it to the memory first.
686 Also you can use them as an input for OCTET STRING, taking no Python
687 memory for their storage.
688
689 There is convenient :py:func:`pyderasn.file_mmaped` function that
690 creates read-only memoryview on the file contents::
691
692     with open("huge", "rb") as fd:
693         raw = file_mmaped(fd)
694         obj = Something.decode(raw)
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     with open("result", "wb") as fd:
726         obj.encode_cer(fd.write)
727
728 ::
729
730     buf = io.BytesIO()
731     obj.encode_cer(buf.write)
732
733 If you do not want to create in-memory buffer every time, then you can
734 use :py:func:`pyderasn.encode_cer` function::
735
736     data = encode_cer(obj)
737
738 Remember that CER is **not valid** DER in most cases, so you **have to**
739 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
740 decoding. Also currently there is **no** validation that provided CER is
741 valid one -- you are sure that it has only valid BER encoding.
742
743 .. warning::
744
745    SET OF values can not be streamingly encoded, because they are
746    required to be sorted byte-by-byte. Big SET OF values still will take
747    much memory. Use neither SET nor SET OF values, as modern ASN.1
748    also recommends too.
749
750 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
751 OCTET STRINGs! They will be streamingly copied from underlying file to
752 the buffer using 1 KB chunks.
753
754 Some structures require that some of the elements have to be forcefully
755 DER encoded. For example ``SignedData`` CMS requires you to encode
756 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
757 encode everything else in BER. You can tell any of the structures to be
758 forcefully encoded in DER during CER encoding, by specifying
759 ``der_forced=True`` attribute::
760
761     class Certificate(Sequence):
762         schema = (...)
763         der_forced = True
764
765     class SignedAttributes(SetOf):
766         schema = Attribute()
767         bounds = (1, float("+inf"))
768         der_forced = True
769
770 .. _agg_octet_string:
771
772 agg_octet_string
773 ________________
774
775 In most cases, huge quantity of binary data is stored as OCTET STRING.
776 CER encoding splits it on 1 KB chunks. BER allows splitting on various
777 levels of chunks inclusion::
778
779     SOME STRING[CONSTRUCTED]
780         OCTET STRING[CONSTRUCTED]
781             OCTET STRING[PRIMITIVE]
782                 DATA CHUNK
783             OCTET STRING[PRIMITIVE]
784                 DATA CHUNK
785             OCTET STRING[PRIMITIVE]
786                 DATA CHUNK
787         OCTET STRING[PRIMITIVE]
788             DATA CHUNK
789         OCTET STRING[CONSTRUCTED]
790             OCTET STRING[PRIMITIVE]
791                 DATA CHUNK
792             OCTET STRING[PRIMITIVE]
793                 DATA CHUNK
794         OCTET STRING[CONSTRUCTED]
795             OCTET STRING[CONSTRUCTED]
796                 OCTET STRING[PRIMITIVE]
797                     DATA CHUNK
798
799 You can not just take the offset and some ``.vlen`` of the STRING and
800 treat it as the payload. If you decode it without
801 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
802 and ``bytes()`` will give the whole payload contents.
803
804 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
805 small memory footprint. There is convenient
806 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
807 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
808 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
809 SHA512 digest of its ``eContent``::
810
811     fd = open("data.p7m", "rb")
812     raw = file_mmaped(fd)
813     ctx = {"bered": True}
814     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
815         if decode_path == ("content",):
816             content = obj
817             break
818     else:
819         raise ValueError("no content found")
820     hasher_state = sha512()
821     def hasher(data):
822         hasher_state.update(data)
823         return len(data)
824     evgens = SignedData().decode_evgen(
825         raw[content.offset:],
826         offset=content.offset,
827         ctx=ctx,
828     )
829     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
830     fd.close()
831     digest = hasher_state.digest()
832
833 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
834 copy the payload (without BER/CER encoding interleaved overhead) in it.
835 Virtually it won't take memory more than for keeping small structures
836 and 1 KB binary chunks.
837
838 .. _seqof-iterators:
839
840 SEQUENCE OF iterators
841 _____________________
842
843 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
844 classes. The only difference with providing the full list of objects, is
845 that type and bounds checking is done during encoding process. Also
846 sequence's value will be emptied after encoding, forcing you to set its
847 value again.
848
849 This is very useful when you have to create some huge objects, like
850 CRLs, with thousands and millions of entities inside. You can write the
851 generator taking necessary data from the database and giving the
852 ``RevokedCertificate`` objects. Only binary representation of that
853 objects will take memory during DER encoding.
854
855 2-pass DER encoding
856 -------------------
857
858 There is ability to do 2-pass encoding to DER, writing results directly
859 to specified writer (buffer, file, whatever). It could be 1.5+ times
860 slower than ordinary encoding, but it takes little memory for 1st pass
861 state storing. For example, 1st pass state for CACert.org's CRL with
862 ~416K of certificate entries takes nearly 3.5 MB of memory.
863 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
864 nearly 0.5 KB of memory.
865
866 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
867 iterators <seqof-iterators>` and write directly to opened file, then
868 there is very small memory footprint.
869
870 1st pass traverses through all the objects of the structure and returns
871 the size of DER encoded structure, together with 1st pass state object.
872 That state contains precalculated lengths for various objects inside the
873 structure.
874
875 ::
876
877     fulllen, state = obj.encode1st()
878
879 2nd pass takes the writer and 1st pass state. It traverses through all
880 the objects again, but writes their encoded representation to the writer.
881
882 ::
883
884     with open("result", "wb") as fd:
885         obj.encode2nd(fd.write, iter(state))
886
887 .. warning::
888
889    You **MUST NOT** use 1st pass state if anything is changed in the
890    objects. It is intended to be used immediately after 1st pass is
891    done!
892
893 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
894 have to reinitialize the values after the 1st pass. And you **have to**
895 be sure that the iterator gives exactly the same values as previously.
896 Yes, you have to run your iterator twice -- because this is two pass
897 encoding mode.
898
899 If you want to encode to the memory, then you can use convenient
900 :py:func:`pyderasn.encode2pass` helper.
901
902 .. _browser:
903
904 ASN.1 browser
905 -------------
906 .. autofunction:: pyderasn.browse
907
908 Base Obj
909 --------
910 .. autoclass:: pyderasn.Obj
911    :members:
912
913 Primitive types
914 ---------------
915
916 Boolean
917 _______
918 .. autoclass:: pyderasn.Boolean
919    :members: __init__
920
921 Integer
922 _______
923 .. autoclass:: pyderasn.Integer
924    :members: __init__, named, tohex
925
926 BitString
927 _________
928 .. autoclass:: pyderasn.BitString
929    :members: __init__, bit_len, named
930
931 OctetString
932 ___________
933 .. autoclass:: pyderasn.OctetString
934    :members: __init__
935
936 Null
937 ____
938 .. autoclass:: pyderasn.Null
939    :members: __init__
940
941 ObjectIdentifier
942 ________________
943 .. autoclass:: pyderasn.ObjectIdentifier
944    :members: __init__
945
946 Enumerated
947 __________
948 .. autoclass:: pyderasn.Enumerated
949
950 CommonString
951 ____________
952 .. autoclass:: pyderasn.CommonString
953
954 NumericString
955 _____________
956 .. autoclass:: pyderasn.NumericString
957
958 PrintableString
959 _______________
960 .. autoclass:: pyderasn.PrintableString
961    :members: __init__, allow_asterisk, allow_ampersand
962
963 IA5String
964 _________
965 .. autoclass:: pyderasn.IA5String
966
967 VisibleString
968 _____________
969 .. autoclass:: pyderasn.VisibleString
970
971 UTCTime
972 _______
973 .. autoclass:: pyderasn.UTCTime
974    :members: __init__, todatetime
975
976 GeneralizedTime
977 _______________
978 .. autoclass:: pyderasn.GeneralizedTime
979    :members: __init__, todatetime
980
981 Special types
982 -------------
983
984 Choice
985 ______
986 .. autoclass:: pyderasn.Choice
987    :members: __init__, choice, value
988
989 PrimitiveTypes
990 ______________
991 .. autoclass:: PrimitiveTypes
992
993 Any
994 ___
995 .. autoclass:: pyderasn.Any
996    :members: __init__
997
998 Constructed types
999 -----------------
1000
1001 Sequence
1002 ________
1003 .. autoclass:: pyderasn.Sequence
1004    :members: __init__
1005
1006 Set
1007 ___
1008 .. autoclass:: pyderasn.Set
1009    :members: __init__
1010
1011 SequenceOf
1012 __________
1013 .. autoclass:: pyderasn.SequenceOf
1014    :members: __init__
1015
1016 SetOf
1017 _____
1018 .. autoclass:: pyderasn.SetOf
1019    :members: __init__
1020
1021 Various
1022 -------
1023
1024 .. autofunction:: pyderasn.abs_decode_path
1025 .. autofunction:: pyderasn.agg_octet_string
1026 .. autofunction:: pyderasn.ascii_visualize
1027 .. autofunction:: pyderasn.colonize_hex
1028 .. autofunction:: pyderasn.encode2pass
1029 .. autofunction:: pyderasn.encode_cer
1030 .. autofunction:: pyderasn.file_mmaped
1031 .. autofunction:: pyderasn.hexenc
1032 .. autofunction:: pyderasn.hexdec
1033 .. autofunction:: pyderasn.hexdump
1034 .. autofunction:: pyderasn.tag_encode
1035 .. autofunction:: pyderasn.tag_decode
1036 .. autofunction:: pyderasn.tag_ctxp
1037 .. autofunction:: pyderasn.tag_ctxc
1038 .. autoclass:: pyderasn.DecodeError
1039    :members: __init__
1040 .. autoclass:: pyderasn.NotEnoughData
1041 .. autoclass:: pyderasn.ExceedingData
1042 .. autoclass:: pyderasn.LenIndefForm
1043 .. autoclass:: pyderasn.TagMismatch
1044 .. autoclass:: pyderasn.InvalidLength
1045 .. autoclass:: pyderasn.InvalidOID
1046 .. autoclass:: pyderasn.ObjUnknown
1047 .. autoclass:: pyderasn.ObjNotReady
1048 .. autoclass:: pyderasn.InvalidValueType
1049 .. autoclass:: pyderasn.BoundsError
1050
1051 .. _cmdline:
1052
1053 Command-line usage
1054 ------------------
1055
1056 You can decode DER/BER files using command line abilities::
1057
1058     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1059
1060 If there is no schema for your file, then you can try parsing it without,
1061 but of course IMPLICIT tags will often make it impossible. But result is
1062 good enough for the certificate above::
1063
1064     $ python -m pyderasn path/to/file
1065         0   [1,3,1604]  . >: SEQUENCE OF
1066         4   [1,3,1453]  . . >: SEQUENCE OF
1067         8   [0,0,   5]  . . . . >: [0] ANY
1068                         . . . . . A0:03:02:01:02
1069        13   [1,1,   3]  . . . . >: INTEGER 61595
1070        18   [1,1,  13]  . . . . >: SEQUENCE OF
1071        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1072        31   [1,1,   0]  . . . . . . >: NULL
1073        33   [1,3, 274]  . . . . >: SEQUENCE OF
1074        37   [1,1,  11]  . . . . . . >: SET OF
1075        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1076        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1077        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1078     [...]
1079      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1080      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1081      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1082                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1083                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1084                         . . . . . . . . . 61:2E:63:6F:6D:2F
1085      1461   [1,1,  13]  . . >: SEQUENCE OF
1086      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1087      1474   [1,1,   0]  . . . . >: NULL
1088      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1089                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1090                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1091     [...]
1092
1093 Human readable OIDs
1094 ___________________
1095
1096 If you have got dictionaries with ObjectIdentifiers, like example one
1097 from ``tests/test_crts.py``::
1098
1099     stroid2name = {
1100         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1101         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1102         [...]
1103         "2.5.4.10": "id-at-organizationName",
1104         "2.5.4.11": "id-at-organizationalUnitName",
1105     }
1106
1107 then you can pass it to pretty printer to see human readable OIDs::
1108
1109     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1110     [...]
1111        37   [1,1,  11]  . . . . . . >: SET OF
1112        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1113        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1114        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1115        50   [1,1,  18]  . . . . . . >: SET OF
1116        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1117        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1118        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1119        70   [1,1,  18]  . . . . . . >: SET OF
1120        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1121        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1122        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1123     [...]
1124
1125 Decode paths
1126 ____________
1127
1128 Each decoded element has so-called decode path: sequence of structure
1129 names it is passing during the decode process. Each element has its own
1130 unique path inside the whole ASN.1 tree. You can print it out with
1131 ``--print-decode-path`` option::
1132
1133     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1134        0    [1,3,1604]  Certificate SEQUENCE []
1135        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1136       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1137       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1138       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1139       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1140       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1141                          . . . . 05:00
1142       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1143       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1144       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1145       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1146       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1147       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1148                          . . . . . . . 13:02:45:53
1149       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1150     [...]
1151
1152 Now you can print only the specified tree, for example signature algorithm::
1153
1154     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1155       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1156       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1157       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1158                          . . 05:00
1159 """
1160
1161 from array import array
1162 from collections import namedtuple
1163 from collections import OrderedDict
1164 from copy import copy
1165 from datetime import datetime
1166 from datetime import timedelta
1167 from io import BytesIO
1168 from math import ceil
1169 from operator import attrgetter
1170 from string import ascii_letters
1171 from string import digits
1172 from struct import Struct as struct_Struct
1173 from sys import maxsize as sys_maxsize
1174 from sys import version_info
1175 from unicodedata import category as unicat
1176
1177 try:
1178     from termcolor import colored
1179 except ImportError:  # pragma: no cover
1180     def colored(what, *args, **kwargs):
1181         return what
1182
1183 try:
1184     from dateutil.tz import tzutc
1185 except ImportError:  # pragma: no cover
1186     tzutc = None
1187
1188
1189 __version__ = "9.1"
1190
1191 __all__ = (
1192     "agg_octet_string",
1193     "Any",
1194     "BitString",
1195     "BMPString",
1196     "Boolean",
1197     "BoundsError",
1198     "Choice",
1199     "colonize_hex",
1200     "DecodeError",
1201     "DecodePathDefBy",
1202     "encode2pass",
1203     "encode_cer",
1204     "Enumerated",
1205     "ExceedingData",
1206     "file_mmaped",
1207     "GeneralizedTime",
1208     "GeneralString",
1209     "GraphicString",
1210     "hexdec",
1211     "hexenc",
1212     "IA5String",
1213     "Integer",
1214     "InvalidLength",
1215     "InvalidOID",
1216     "InvalidValueType",
1217     "ISO646String",
1218     "LenIndefForm",
1219     "NotEnoughData",
1220     "Null",
1221     "NumericString",
1222     "obj_by_path",
1223     "ObjectIdentifier",
1224     "ObjNotReady",
1225     "ObjUnknown",
1226     "OctetString",
1227     "PrimitiveTypes",
1228     "PrintableString",
1229     "Sequence",
1230     "SequenceOf",
1231     "Set",
1232     "SetOf",
1233     "T61String",
1234     "tag_ctxc",
1235     "tag_ctxp",
1236     "tag_decode",
1237     "TagClassApplication",
1238     "TagClassContext",
1239     "TagClassPrivate",
1240     "TagClassUniversal",
1241     "TagFormConstructed",
1242     "TagFormPrimitive",
1243     "TagMismatch",
1244     "TeletexString",
1245     "UniversalString",
1246     "UTCTime",
1247     "UTF8String",
1248     "VideotexString",
1249     "VisibleString",
1250 )
1251
1252 TagClassUniversal = 0
1253 TagClassApplication = 1 << 6
1254 TagClassContext = 1 << 7
1255 TagClassPrivate = 1 << 6 | 1 << 7
1256 TagFormPrimitive = 0
1257 TagFormConstructed = 1 << 5
1258 TagClassReprs = {
1259     TagClassContext: "",
1260     TagClassApplication: "APPLICATION ",
1261     TagClassPrivate: "PRIVATE ",
1262     TagClassUniversal: "UNIV ",
1263 }
1264 EOC = b"\x00\x00"
1265 EOC_LEN = len(EOC)
1266 LENINDEF = b"\x80"  # length indefinite mark
1267 LENINDEF_PP_CHAR = "∞"
1268 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1269 SET01 = frozenset("01")
1270 DECIMALS = frozenset(digits)
1271 DECIMAL_SIGNS = ".,"
1272 NEXT_ATTR_NAME = "__next__"
1273
1274
1275 def file_mmaped(fd):
1276     """Make mmap-ed memoryview for reading from file
1277
1278     :param fd: file object
1279     :returns: memoryview over read-only mmap-ing of the whole file
1280
1281     .. warning::
1282
1283        It does not work under Windows.
1284     """
1285     import mmap
1286     return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ))
1287
1288
1289 def pureint(value):
1290     if not set(value) <= DECIMALS:
1291         raise ValueError("non-pure integer")
1292     return int(value)
1293
1294
1295 def fractions2float(fractions_raw):
1296     pureint(fractions_raw)
1297     return float("0." + fractions_raw)
1298
1299
1300 def get_def_by_path(defines_by_path, sub_decode_path):
1301     """Get define by decode path
1302     """
1303     for path, define in defines_by_path:
1304         if len(path) != len(sub_decode_path):
1305             continue
1306         for p1, p2 in zip(path, sub_decode_path):
1307             if (p1 is not any) and (p1 != p2):
1308                 break
1309         else:
1310             return define
1311
1312
1313 ########################################################################
1314 # Errors
1315 ########################################################################
1316
1317 class ASN1Error(ValueError):
1318     pass
1319
1320
1321 class DecodeError(ASN1Error):
1322     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1323         """
1324         :param str msg: reason of decode failing
1325         :param klass: optional exact DecodeError inherited class (like
1326                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1327                       :py:exc:`InvalidLength`)
1328         :param decode_path: tuple of strings. It contains human
1329                             readable names of the fields through which
1330                             decoding process has passed
1331         :param int offset: binary offset where failure happened
1332         """
1333         super().__init__()
1334         self.msg = msg
1335         self.klass = klass
1336         self.decode_path = decode_path
1337         self.offset = offset
1338
1339     def __str__(self):
1340         return " ".join(
1341             c for c in (
1342                 "" if self.klass is None else self.klass.__name__,
1343                 (
1344                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1345                     if len(self.decode_path) > 0 else ""
1346                 ),
1347                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1348                 self.msg,
1349             ) if c != ""
1350         )
1351
1352     def __repr__(self):
1353         return "%s(%s)" % (self.__class__.__name__, self)
1354
1355
1356 class NotEnoughData(DecodeError):
1357     pass
1358
1359
1360 class ExceedingData(ASN1Error):
1361     def __init__(self, nbytes):
1362         super().__init__()
1363         self.nbytes = nbytes
1364
1365     def __str__(self):
1366         return "%d trailing bytes" % self.nbytes
1367
1368     def __repr__(self):
1369         return "%s(%s)" % (self.__class__.__name__, self)
1370
1371
1372 class LenIndefForm(DecodeError):
1373     pass
1374
1375
1376 class TagMismatch(DecodeError):
1377     pass
1378
1379
1380 class InvalidLength(DecodeError):
1381     pass
1382
1383
1384 class InvalidOID(DecodeError):
1385     pass
1386
1387
1388 class ObjUnknown(ASN1Error):
1389     def __init__(self, name):
1390         super().__init__()
1391         self.name = name
1392
1393     def __str__(self):
1394         return "object is unknown: %s" % self.name
1395
1396     def __repr__(self):
1397         return "%s(%s)" % (self.__class__.__name__, self)
1398
1399
1400 class ObjNotReady(ASN1Error):
1401     def __init__(self, name):
1402         super().__init__()
1403         self.name = name
1404
1405     def __str__(self):
1406         return "object is not ready: %s" % self.name
1407
1408     def __repr__(self):
1409         return "%s(%s)" % (self.__class__.__name__, self)
1410
1411
1412 class InvalidValueType(ASN1Error):
1413     def __init__(self, expected_types):
1414         super().__init__()
1415         self.expected_types = expected_types
1416
1417     def __str__(self):
1418         return "invalid value type, expected: %s" % ", ".join(
1419             [repr(t) for t in self.expected_types]
1420         )
1421
1422     def __repr__(self):
1423         return "%s(%s)" % (self.__class__.__name__, self)
1424
1425
1426 class BoundsError(ASN1Error):
1427     def __init__(self, bound_min, value, bound_max):
1428         super().__init__()
1429         self.bound_min = bound_min
1430         self.value = value
1431         self.bound_max = bound_max
1432
1433     def __str__(self):
1434         return "unsatisfied bounds: %s <= %s <= %s" % (
1435             self.bound_min,
1436             self.value,
1437             self.bound_max,
1438         )
1439
1440     def __repr__(self):
1441         return "%s(%s)" % (self.__class__.__name__, self)
1442
1443
1444 ########################################################################
1445 # Basic coders
1446 ########################################################################
1447
1448 def hexdec(data):
1449     """Binary data to hexadecimal string convert
1450     """
1451     return bytes.fromhex(data)
1452
1453
1454 def hexenc(data):
1455     """Hexadecimal string to binary data convert
1456     """
1457     return data.hex()
1458
1459
1460 def int_bytes_len(num, byte_len=8):
1461     if num == 0:
1462         return 1
1463     return int(ceil(float(num.bit_length()) / byte_len))
1464
1465
1466 def zero_ended_encode(num):
1467     octets = bytearray(int_bytes_len(num, 7))
1468     i = len(octets) - 1
1469     octets[i] = num & 0x7F
1470     num >>= 7
1471     i -= 1
1472     while num > 0:
1473         octets[i] = 0x80 | (num & 0x7F)
1474         num >>= 7
1475         i -= 1
1476     return bytes(octets)
1477
1478
1479 int2byte = struct_Struct(">B").pack
1480
1481
1482 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1483     """Encode tag to binary form
1484
1485     :param int num: tag's number
1486     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1487                       :py:data:`pyderasn.TagClassContext`,
1488                       :py:data:`pyderasn.TagClassApplication`,
1489                       :py:data:`pyderasn.TagClassPrivate`)
1490     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1491                      :py:data:`pyderasn.TagFormConstructed`)
1492     """
1493     if num < 31:
1494         # [XX|X|.....]
1495         return int2byte(klass | form | num)
1496     # [XX|X|11111][1.......][1.......] ... [0.......]
1497     return int2byte(klass | form | 31) + zero_ended_encode(num)
1498
1499
1500 def tag_decode(tag):
1501     """Decode tag from binary form
1502
1503     .. warning::
1504
1505        No validation is performed, assuming that it has already passed.
1506
1507     It returns tuple with three integers, as
1508     :py:func:`pyderasn.tag_encode` accepts.
1509     """
1510     first_octet = tag[0]
1511     klass = first_octet & 0xC0
1512     form = first_octet & 0x20
1513     if first_octet & 0x1F < 0x1F:
1514         return (klass, form, first_octet & 0x1F)
1515     num = 0
1516     for octet in tag[1:]:
1517         num <<= 7
1518         num |= octet & 0x7F
1519     return (klass, form, num)
1520
1521
1522 def tag_ctxp(num):
1523     """Create CONTEXT PRIMITIVE tag
1524     """
1525     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1526
1527
1528 def tag_ctxc(num):
1529     """Create CONTEXT CONSTRUCTED tag
1530     """
1531     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1532
1533
1534 def tag_strip(data):
1535     """Take off tag from the data
1536
1537     :returns: (encoded tag, tag length, remaining data)
1538     """
1539     if len(data) == 0:
1540         raise NotEnoughData("no data at all")
1541     if data[0] & 0x1F < 31:
1542         return data[:1], 1, data[1:]
1543     i = 0
1544     while True:
1545         i += 1
1546         if i == len(data):
1547             raise DecodeError("unfinished tag")
1548         if data[i] & 0x80 == 0:
1549             break
1550     if i == 1 and data[1] < 0x1F:
1551         raise DecodeError("unexpected long form")
1552     if i > 1 and data[1] & 0x7F == 0:
1553         raise DecodeError("leading zero byte in tag value")
1554     i += 1
1555     return data[:i], i, data[i:]
1556
1557
1558 def len_encode(l):
1559     if l < 0x80:
1560         return int2byte(l)
1561     octets = bytearray(int_bytes_len(l) + 1)
1562     octets[0] = 0x80 | (len(octets) - 1)
1563     for i in range(len(octets) - 1, 0, -1):
1564         octets[i] = l & 0xFF
1565         l >>= 8
1566     return bytes(octets)
1567
1568
1569 def len_decode(data):
1570     """Decode length
1571
1572     :returns: (decoded length, length's length, remaining data)
1573     :raises LenIndefForm: if indefinite form encoding is met
1574     """
1575     if len(data) == 0:
1576         raise NotEnoughData("no data at all")
1577     first_octet = data[0]
1578     if first_octet & 0x80 == 0:
1579         return first_octet, 1, data[1:]
1580     octets_num = first_octet & 0x7F
1581     if octets_num + 1 > len(data):
1582         raise NotEnoughData("encoded length is longer than data")
1583     if octets_num == 0:
1584         raise LenIndefForm()
1585     if data[1] == 0:
1586         raise DecodeError("leading zeros")
1587     l = 0
1588     for v in data[1:1 + octets_num]:
1589         l = (l << 8) | v
1590     if l <= 127:
1591         raise DecodeError("long form instead of short one")
1592     return l, 1 + octets_num, data[1 + octets_num:]
1593
1594
1595 LEN0 = len_encode(0)
1596 LEN1 = len_encode(1)
1597 LEN1K = len_encode(1000)
1598
1599
1600 def len_size(l):
1601     """How many bytes length field will take
1602     """
1603     if l < 128:
1604         return 1
1605     if l < 256:  # 1 << 8
1606         return 2
1607     if l < 65536:  # 1 << 16
1608         return 3
1609     if l < 16777216:  # 1 << 24
1610         return 4
1611     if l < 4294967296:  # 1 << 32
1612         return 5
1613     if l < 1099511627776:  # 1 << 40
1614         return 6
1615     if l < 281474976710656:  # 1 << 48
1616         return 7
1617     if l < 72057594037927936:  # 1 << 56
1618         return 8
1619     raise OverflowError("too big length")
1620
1621
1622 def write_full(writer, data):
1623     """Fully write provided data
1624
1625     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1626
1627     BytesIO does not guarantee that the whole data will be written at
1628     once. That function write everything provided, raising an error if
1629     ``writer`` returns None.
1630     """
1631     data = memoryview(data)
1632     written = 0
1633     while written != len(data):
1634         n = writer(data[written:])
1635         if n is None:
1636             raise ValueError("can not write to buf")
1637         written += n
1638
1639
1640 # If it is 64-bit system, then use compact 64-bit array of unsigned
1641 # longs. Use an ordinary list with universal integers otherwise, that
1642 # is slower.
1643 if sys_maxsize > 2 ** 32:
1644     def state_2pass_new():
1645         return array("L")
1646 else:
1647     def state_2pass_new():
1648         return []
1649
1650
1651 ########################################################################
1652 # Base class
1653 ########################################################################
1654
1655 class AutoAddSlots(type):
1656     def __new__(cls, name, bases, _dict):
1657         _dict["__slots__"] = _dict.get("__slots__", ())
1658         return super().__new__(cls, name, bases, _dict)
1659
1660
1661 BasicState = namedtuple("BasicState", (
1662     "version",
1663     "tag",
1664     "tag_order",
1665     "expl",
1666     "default",
1667     "optional",
1668     "offset",
1669     "llen",
1670     "vlen",
1671     "expl_lenindef",
1672     "lenindef",
1673     "ber_encoded",
1674 ), **NAMEDTUPLE_KWARGS)
1675
1676
1677 class Obj(metaclass=AutoAddSlots):
1678     """Common ASN.1 object class
1679
1680     All ASN.1 types are inherited from it. It has metaclass that
1681     automatically adds ``__slots__`` to all inherited classes.
1682     """
1683     __slots__ = (
1684         "tag",
1685         "_tag_order",
1686         "_value",
1687         "_expl",
1688         "default",
1689         "optional",
1690         "offset",
1691         "llen",
1692         "vlen",
1693         "expl_lenindef",
1694         "lenindef",
1695         "ber_encoded",
1696     )
1697
1698     def __init__(
1699             self,
1700             impl=None,
1701             expl=None,
1702             default=None,
1703             optional=False,
1704             _decoded=(0, 0, 0),
1705     ):
1706         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1707         self._expl = getattr(self, "expl", None) if expl is None else expl
1708         if self.tag != self.tag_default and self._expl is not None:
1709             raise ValueError("implicit and explicit tags can not be set simultaneously")
1710         if self.tag is None:
1711             self._tag_order = None
1712         else:
1713             tag_class, _, tag_num = tag_decode(
1714                 self.tag if self._expl is None else self._expl
1715             )
1716             self._tag_order = (tag_class, tag_num)
1717         if default is not None:
1718             optional = True
1719         self.optional = optional
1720         self.offset, self.llen, self.vlen = _decoded
1721         self.default = None
1722         self.expl_lenindef = False
1723         self.lenindef = False
1724         self.ber_encoded = False
1725
1726     @property
1727     def ready(self):  # pragma: no cover
1728         """Is object ready to be encoded?
1729         """
1730         raise NotImplementedError()
1731
1732     def _assert_ready(self):
1733         if not self.ready:
1734             raise ObjNotReady(self.__class__.__name__)
1735
1736     @property
1737     def bered(self):
1738         """Is either object or any elements inside is BER encoded?
1739         """
1740         return self.expl_lenindef or self.lenindef or self.ber_encoded
1741
1742     @property
1743     def decoded(self):
1744         """Is object decoded?
1745         """
1746         return (self.llen + self.vlen) > 0
1747
1748     def __getstate__(self):  # pragma: no cover
1749         """Used for making safe to be mutable pickleable copies
1750         """
1751         raise NotImplementedError()
1752
1753     def __setstate__(self, state):
1754         if state.version != __version__:
1755             raise ValueError("data is pickled by different PyDERASN version")
1756         self.tag = state.tag
1757         self._tag_order = state.tag_order
1758         self._expl = state.expl
1759         self.default = state.default
1760         self.optional = state.optional
1761         self.offset = state.offset
1762         self.llen = state.llen
1763         self.vlen = state.vlen
1764         self.expl_lenindef = state.expl_lenindef
1765         self.lenindef = state.lenindef
1766         self.ber_encoded = state.ber_encoded
1767
1768     @property
1769     def tag_order(self):
1770         """Tag's (class, number) used for DER/CER sorting
1771         """
1772         return self._tag_order
1773
1774     @property
1775     def tag_order_cer(self):
1776         return self.tag_order
1777
1778     @property
1779     def tlen(self):
1780         """.. seealso:: :ref:`decoding`
1781         """
1782         return len(self.tag)
1783
1784     @property
1785     def tlvlen(self):
1786         """.. seealso:: :ref:`decoding`
1787         """
1788         return self.tlen + self.llen + self.vlen
1789
1790     def __str__(self):  # pragma: no cover
1791         return self.__unicode__()
1792
1793     def __ne__(self, their):
1794         return not(self == their)
1795
1796     def __gt__(self, their):  # pragma: no cover
1797         return not(self < their)
1798
1799     def __le__(self, their):  # pragma: no cover
1800         return (self == their) or (self < their)
1801
1802     def __ge__(self, their):  # pragma: no cover
1803         return (self == their) or (self > their)
1804
1805     def _encode(self):  # pragma: no cover
1806         raise NotImplementedError()
1807
1808     def _encode_cer(self, writer):
1809         write_full(writer, self._encode())
1810
1811     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1812         yield NotImplemented
1813
1814     def _encode1st(self, state):
1815         raise NotImplementedError()
1816
1817     def _encode2nd(self, writer, state_iter):
1818         raise NotImplementedError()
1819
1820     def encode(self):
1821         """DER encode the structure
1822
1823         :returns: DER representation
1824         """
1825         raw = self._encode()
1826         if self._expl is None:
1827             return raw
1828         return b"".join((self._expl, len_encode(len(raw)), raw))
1829
1830     def encode1st(self, state=None):
1831         """Do the 1st pass of 2-pass encoding
1832
1833         :rtype: (int, array("L"))
1834         :returns: full length of encoded data and precalculated various
1835                   objects lengths
1836         """
1837         if state is None:
1838             state = state_2pass_new()
1839         if self._expl is None:
1840             return self._encode1st(state)
1841         state.append(0)
1842         idx = len(state) - 1
1843         vlen, _ = self._encode1st(state)
1844         state[idx] = vlen
1845         fulllen = len(self._expl) + len_size(vlen) + vlen
1846         return fulllen, state
1847
1848     def encode2nd(self, writer, state_iter):
1849         """Do the 2nd pass of 2-pass encoding
1850
1851         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1852         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1853         """
1854         if self._expl is None:
1855             self._encode2nd(writer, state_iter)
1856         else:
1857             write_full(writer, self._expl + len_encode(next(state_iter)))
1858             self._encode2nd(writer, state_iter)
1859
1860     def encode_cer(self, writer):
1861         """CER encode the structure to specified writer
1862
1863         :param writer: must comply with ``io.RawIOBase.write``
1864                        behaviour. It takes slice to be written and
1865                        returns number of bytes processed. If it returns
1866                        None, then exception will be raised
1867         """
1868         if self._expl is not None:
1869             write_full(writer, self._expl + LENINDEF)
1870         if getattr(self, "der_forced", False):
1871             write_full(writer, self._encode())
1872         else:
1873             self._encode_cer(writer)
1874         if self._expl is not None:
1875             write_full(writer, EOC)
1876
1877     def hexencode(self):
1878         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1879         """
1880         return hexenc(self.encode())
1881
1882     def decode(
1883             self,
1884             data,
1885             offset=0,
1886             leavemm=False,
1887             decode_path=(),
1888             ctx=None,
1889             tag_only=False,
1890             _ctx_immutable=True,
1891     ):
1892         """Decode the data
1893
1894         :param data: either binary or memoryview
1895         :param int offset: initial data's offset
1896         :param bool leavemm: do we need to leave memoryview of remaining
1897                     data as is, or convert it to bytes otherwise
1898         :param decode_path: current decode path (tuples of strings,
1899                             possibly with DecodePathDefBy) with will be
1900                             the root for all underlying objects
1901         :param ctx: optional :ref:`context <ctx>` governing decoding process
1902         :param bool tag_only: decode only the tag, without length and
1903                               contents (used only in Choice and Set
1904                               structures, trying to determine if tag satisfies
1905                               the schema)
1906         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1907                                     before using it?
1908         :returns: (Obj, remaining data)
1909
1910         .. seealso:: :ref:`decoding`
1911         """
1912         result = next(self.decode_evgen(
1913             data,
1914             offset,
1915             leavemm,
1916             decode_path,
1917             ctx,
1918             tag_only,
1919             _ctx_immutable,
1920             _evgen_mode=False,
1921         ))
1922         if result is None:
1923             return None
1924         _, obj, tail = result
1925         return obj, tail
1926
1927     def decode_evgen(
1928             self,
1929             data,
1930             offset=0,
1931             leavemm=False,
1932             decode_path=(),
1933             ctx=None,
1934             tag_only=False,
1935             _ctx_immutable=True,
1936             _evgen_mode=True,
1937     ):
1938         """Decode with evgen mode on
1939
1940         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1941         it returns the generator producing ``(decode_path, obj, tail)``
1942         values.
1943         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1944         """
1945         if ctx is None:
1946             ctx = {}
1947         elif _ctx_immutable:
1948             ctx = copy(ctx)
1949         tlv = memoryview(data)
1950         if (
1951                 _evgen_mode and
1952                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1953         ):
1954             _evgen_mode = False
1955         if self._expl is None:
1956             for result in self._decode(
1957                     tlv,
1958                     offset=offset,
1959                     decode_path=decode_path,
1960                     ctx=ctx,
1961                     tag_only=tag_only,
1962                     evgen_mode=_evgen_mode,
1963             ):
1964                 if tag_only:
1965                     yield None
1966                     return
1967                 _decode_path, obj, tail = result
1968                 if _decode_path is not decode_path:
1969                     yield result
1970         else:
1971             try:
1972                 t, tlen, lv = tag_strip(tlv)
1973             except DecodeError as err:
1974                 raise err.__class__(
1975                     msg=err.msg,
1976                     klass=self.__class__,
1977                     decode_path=decode_path,
1978                     offset=offset,
1979                 )
1980             if t != self._expl:
1981                 raise TagMismatch(
1982                     klass=self.__class__,
1983                     decode_path=decode_path,
1984                     offset=offset,
1985                 )
1986             try:
1987                 l, llen, v = len_decode(lv)
1988             except LenIndefForm as err:
1989                 if not ctx.get("bered", False):
1990                     raise err.__class__(
1991                         msg=err.msg,
1992                         klass=self.__class__,
1993                         decode_path=decode_path,
1994                         offset=offset,
1995                     )
1996                 llen, v = 1, lv[1:]
1997                 offset += tlen + llen
1998                 for result in self._decode(
1999                         v,
2000                         offset=offset,
2001                         decode_path=decode_path,
2002                         ctx=ctx,
2003                         tag_only=tag_only,
2004                         evgen_mode=_evgen_mode,
2005                 ):
2006                     if tag_only:  # pragma: no cover
2007                         yield None
2008                         return
2009                     _decode_path, obj, tail = result
2010                     if _decode_path is not decode_path:
2011                         yield result
2012                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2013                 if eoc_expected.tobytes() != EOC:
2014                     raise DecodeError(
2015                         "no EOC",
2016                         klass=self.__class__,
2017                         decode_path=decode_path,
2018                         offset=offset,
2019                     )
2020                 obj.vlen += EOC_LEN
2021                 obj.expl_lenindef = True
2022             except DecodeError as err:
2023                 raise err.__class__(
2024                     msg=err.msg,
2025                     klass=self.__class__,
2026                     decode_path=decode_path,
2027                     offset=offset,
2028                 )
2029             else:
2030                 if l > len(v):
2031                     raise NotEnoughData(
2032                         "encoded length is longer than data",
2033                         klass=self.__class__,
2034                         decode_path=decode_path,
2035                         offset=offset,
2036                     )
2037                 for result in self._decode(
2038                         v,
2039                         offset=offset + tlen + llen,
2040                         decode_path=decode_path,
2041                         ctx=ctx,
2042                         tag_only=tag_only,
2043                         evgen_mode=_evgen_mode,
2044                 ):
2045                     if tag_only:  # pragma: no cover
2046                         yield None
2047                         return
2048                     _decode_path, obj, tail = result
2049                     if _decode_path is not decode_path:
2050                         yield result
2051                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2052                     raise DecodeError(
2053                         "explicit tag out-of-bound, longer than data",
2054                         klass=self.__class__,
2055                         decode_path=decode_path,
2056                         offset=offset,
2057                     )
2058         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2059
2060     def decod(self, data, offset=0, decode_path=(), ctx=None):
2061         """Decode the data, check that tail is empty
2062
2063         :raises ExceedingData: if tail is not empty
2064
2065         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2066         (decode without tail) that also checks that there is no
2067         trailing data left.
2068         """
2069         obj, tail = self.decode(
2070             data,
2071             offset=offset,
2072             decode_path=decode_path,
2073             ctx=ctx,
2074             leavemm=True,
2075         )
2076         if len(tail) > 0:
2077             raise ExceedingData(len(tail))
2078         return obj
2079
2080     def hexdecode(self, data, *args, **kwargs):
2081         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2082         """
2083         return self.decode(hexdec(data), *args, **kwargs)
2084
2085     def hexdecod(self, data, *args, **kwargs):
2086         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2087         """
2088         return self.decod(hexdec(data), *args, **kwargs)
2089
2090     @property
2091     def expled(self):
2092         """.. seealso:: :ref:`decoding`
2093         """
2094         return self._expl is not None
2095
2096     @property
2097     def expl_tag(self):
2098         """.. seealso:: :ref:`decoding`
2099         """
2100         return self._expl
2101
2102     @property
2103     def expl_tlen(self):
2104         """.. seealso:: :ref:`decoding`
2105         """
2106         return len(self._expl)
2107
2108     @property
2109     def expl_llen(self):
2110         """.. seealso:: :ref:`decoding`
2111         """
2112         if self.expl_lenindef:
2113             return 1
2114         return len(len_encode(self.tlvlen))
2115
2116     @property
2117     def expl_offset(self):
2118         """.. seealso:: :ref:`decoding`
2119         """
2120         return self.offset - self.expl_tlen - self.expl_llen
2121
2122     @property
2123     def expl_vlen(self):
2124         """.. seealso:: :ref:`decoding`
2125         """
2126         return self.tlvlen
2127
2128     @property
2129     def expl_tlvlen(self):
2130         """.. seealso:: :ref:`decoding`
2131         """
2132         return self.expl_tlen + self.expl_llen + self.expl_vlen
2133
2134     @property
2135     def fulloffset(self):
2136         """.. seealso:: :ref:`decoding`
2137         """
2138         return self.expl_offset if self.expled else self.offset
2139
2140     @property
2141     def fulllen(self):
2142         """.. seealso:: :ref:`decoding`
2143         """
2144         return self.expl_tlvlen if self.expled else self.tlvlen
2145
2146     def pps_lenindef(self, decode_path):
2147         if self.lenindef and not (
2148                 getattr(self, "defined", None) is not None and
2149                 self.defined[1].lenindef
2150         ):
2151             yield _pp(
2152                 asn1_type_name="EOC",
2153                 obj_name="",
2154                 decode_path=decode_path,
2155                 offset=(
2156                     self.offset + self.tlvlen -
2157                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2158                 ),
2159                 tlen=1,
2160                 llen=1,
2161                 vlen=0,
2162                 ber_encoded=True,
2163                 bered=True,
2164             )
2165         if self.expl_lenindef:
2166             yield _pp(
2167                 asn1_type_name="EOC",
2168                 obj_name="EXPLICIT",
2169                 decode_path=decode_path,
2170                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2171                 tlen=1,
2172                 llen=1,
2173                 vlen=0,
2174                 ber_encoded=True,
2175                 bered=True,
2176             )
2177
2178
2179 def encode_cer(obj):
2180     """Encode to CER in memory buffer
2181
2182     :returns bytes: memory buffer contents
2183     """
2184     buf = BytesIO()
2185     obj.encode_cer(buf.write)
2186     return buf.getvalue()
2187
2188
2189 def encode2pass(obj):
2190     """Encode (2-pass mode) to DER in memory buffer
2191
2192     :returns bytes: memory buffer contents
2193     """
2194     buf = BytesIO()
2195     _, state = obj.encode1st()
2196     obj.encode2nd(buf.write, iter(state))
2197     return buf.getvalue()
2198
2199
2200 class DecodePathDefBy:
2201     """DEFINED BY representation inside decode path
2202     """
2203     __slots__ = ("defined_by",)
2204
2205     def __init__(self, defined_by):
2206         self.defined_by = defined_by
2207
2208     def __ne__(self, their):
2209         return not(self == their)
2210
2211     def __eq__(self, their):
2212         if not isinstance(their, self.__class__):
2213             return False
2214         return self.defined_by == their.defined_by
2215
2216     def __str__(self):
2217         return "DEFINED BY " + str(self.defined_by)
2218
2219     def __repr__(self):
2220         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2221
2222
2223 ########################################################################
2224 # Pretty printing
2225 ########################################################################
2226
2227 PP = namedtuple("PP", (
2228     "obj",
2229     "asn1_type_name",
2230     "obj_name",
2231     "decode_path",
2232     "value",
2233     "blob",
2234     "optional",
2235     "default",
2236     "impl",
2237     "expl",
2238     "offset",
2239     "tlen",
2240     "llen",
2241     "vlen",
2242     "expl_offset",
2243     "expl_tlen",
2244     "expl_llen",
2245     "expl_vlen",
2246     "expl_lenindef",
2247     "lenindef",
2248     "ber_encoded",
2249     "bered",
2250 ), **NAMEDTUPLE_KWARGS)
2251
2252
2253 def _pp(
2254         obj=None,
2255         asn1_type_name="unknown",
2256         obj_name="unknown",
2257         decode_path=(),
2258         value=None,
2259         blob=None,
2260         optional=False,
2261         default=False,
2262         impl=None,
2263         expl=None,
2264         offset=0,
2265         tlen=0,
2266         llen=0,
2267         vlen=0,
2268         expl_offset=None,
2269         expl_tlen=None,
2270         expl_llen=None,
2271         expl_vlen=None,
2272         expl_lenindef=False,
2273         lenindef=False,
2274         ber_encoded=False,
2275         bered=False,
2276 ):
2277     return PP(
2278         obj,
2279         asn1_type_name,
2280         obj_name,
2281         decode_path,
2282         value,
2283         blob,
2284         optional,
2285         default,
2286         impl,
2287         expl,
2288         offset,
2289         tlen,
2290         llen,
2291         vlen,
2292         expl_offset,
2293         expl_tlen,
2294         expl_llen,
2295         expl_vlen,
2296         expl_lenindef,
2297         lenindef,
2298         ber_encoded,
2299         bered,
2300     )
2301
2302
2303 def _colourize(what, colour, with_colours, attrs=("bold",)):
2304     return colored(what, colour, attrs=attrs) if with_colours else what
2305
2306
2307 def colonize_hex(hexed):
2308     """Separate hexadecimal string with colons
2309     """
2310     return ":".join(hexed[i:i + 2] for i in range(0, len(hexed), 2))
2311
2312
2313 def find_oid_name(asn1_type_name, oid_maps, value):
2314     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2315         for oid_map in oid_maps:
2316             oid_name = oid_map.get(value)
2317             if oid_name is not None:
2318                 return oid_name
2319     return None
2320
2321
2322 def pp_console_row(
2323         pp,
2324         oid_maps=(),
2325         with_offsets=False,
2326         with_blob=True,
2327         with_colours=False,
2328         with_decode_path=False,
2329         decode_path_len_decrease=0,
2330 ):
2331     cols = []
2332     if with_offsets:
2333         col = "%5d%s%s" % (
2334             pp.offset,
2335             (
2336                 "  " if pp.expl_offset is None else
2337                 ("-%d" % (pp.offset - pp.expl_offset))
2338             ),
2339             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2340         )
2341         col = _colourize(col, "red", with_colours, ())
2342         col += _colourize("B", "red", with_colours) if pp.bered else " "
2343         cols.append(col)
2344         col = "[%d,%d,%4d]%s" % (
2345             pp.tlen, pp.llen, pp.vlen,
2346             LENINDEF_PP_CHAR if pp.lenindef else " "
2347         )
2348         col = _colourize(col, "green", with_colours, ())
2349         cols.append(col)
2350     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2351     if decode_path_len > 0:
2352         cols.append(" ." * decode_path_len)
2353         ent = pp.decode_path[-1]
2354         if isinstance(ent, DecodePathDefBy):
2355             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2356             value = str(ent.defined_by)
2357             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2358             if oid_name is None:
2359                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2360             else:
2361                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2362         else:
2363             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2364     if pp.expl is not None:
2365         klass, _, num = pp.expl
2366         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2367         cols.append(_colourize(col, "blue", with_colours))
2368     if pp.impl is not None:
2369         klass, _, num = pp.impl
2370         col = "[%s%d]" % (TagClassReprs[klass], num)
2371         cols.append(_colourize(col, "blue", with_colours))
2372     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2373         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2374     if pp.ber_encoded:
2375         cols.append(_colourize("BER", "red", with_colours))
2376     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2377     if pp.value is not None:
2378         value = pp.value
2379         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2380         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2381         if oid_name is not None:
2382             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2383         if pp.asn1_type_name == Integer.asn1_type_name:
2384             cols.append(_colourize(
2385                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2386             ))
2387     if with_blob:
2388         if pp.blob.__class__ == bytes:
2389             cols.append(hexenc(pp.blob))
2390         elif pp.blob.__class__ == tuple:
2391             cols.append(", ".join(pp.blob))
2392     if pp.optional:
2393         cols.append(_colourize("OPTIONAL", "red", with_colours))
2394     if pp.default:
2395         cols.append(_colourize("DEFAULT", "red", with_colours))
2396     if with_decode_path:
2397         cols.append(_colourize(
2398             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2399             "grey",
2400             with_colours,
2401         ))
2402     return " ".join(cols)
2403
2404
2405 def pp_console_blob(pp, decode_path_len_decrease=0):
2406     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2407     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2408     if decode_path_len > 0:
2409         cols.append(" ." * (decode_path_len + 1))
2410     if pp.blob.__class__ == bytes:
2411         blob = hexenc(pp.blob).upper()
2412         for i in range(0, len(blob), 32):
2413             chunk = blob[i:i + 32]
2414             yield " ".join(cols + [colonize_hex(chunk)])
2415     elif pp.blob.__class__ == tuple:
2416         yield " ".join(cols + [", ".join(pp.blob)])
2417
2418
2419 def pprint(
2420         obj,
2421         oid_maps=(),
2422         big_blobs=False,
2423         with_colours=False,
2424         with_decode_path=False,
2425         decode_path_only=(),
2426         decode_path=(),
2427 ):
2428     """Pretty print object
2429
2430     :param Obj obj: object you want to pretty print
2431     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2432                      Its human readable form is printed when OID is met
2433     :param big_blobs: if large binary objects are met (like OctetString
2434                       values), do we need to print them too, on separate
2435                       lines
2436     :param with_colours: colourize output, if ``termcolor`` library
2437                          is available
2438     :param with_decode_path: print decode path
2439     :param decode_path_only: print only that specified decode path
2440     """
2441     def _pprint_pps(pps):
2442         for pp in pps:
2443             if hasattr(pp, "_fields"):
2444                 if (
2445                         decode_path_only != () and
2446                         tuple(
2447                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2448                         ) != decode_path_only
2449                 ):
2450                     continue
2451                 if big_blobs:
2452                     yield pp_console_row(
2453                         pp,
2454                         oid_maps=oid_maps,
2455                         with_offsets=True,
2456                         with_blob=False,
2457                         with_colours=with_colours,
2458                         with_decode_path=with_decode_path,
2459                         decode_path_len_decrease=len(decode_path_only),
2460                     )
2461                     for row in pp_console_blob(
2462                             pp,
2463                             decode_path_len_decrease=len(decode_path_only),
2464                     ):
2465                         yield row
2466                 else:
2467                     yield pp_console_row(
2468                         pp,
2469                         oid_maps=oid_maps,
2470                         with_offsets=True,
2471                         with_blob=True,
2472                         with_colours=with_colours,
2473                         with_decode_path=with_decode_path,
2474                         decode_path_len_decrease=len(decode_path_only),
2475                     )
2476             else:
2477                 for row in _pprint_pps(pp):
2478                     yield row
2479     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2480
2481
2482 ########################################################################
2483 # ASN.1 primitive types
2484 ########################################################################
2485
2486 BooleanState = namedtuple(
2487     "BooleanState",
2488     BasicState._fields + ("value",),
2489     **NAMEDTUPLE_KWARGS
2490 )
2491
2492
2493 class Boolean(Obj):
2494     """``BOOLEAN`` boolean type
2495
2496     >>> b = Boolean(True)
2497     BOOLEAN True
2498     >>> b == Boolean(True)
2499     True
2500     >>> bool(b)
2501     True
2502     """
2503     __slots__ = ()
2504     tag_default = tag_encode(1)
2505     asn1_type_name = "BOOLEAN"
2506
2507     def __init__(
2508             self,
2509             value=None,
2510             impl=None,
2511             expl=None,
2512             default=None,
2513             optional=False,
2514             _decoded=(0, 0, 0),
2515     ):
2516         """
2517         :param value: set the value. Either boolean type, or
2518                       :py:class:`pyderasn.Boolean` object
2519         :param bytes impl: override default tag with ``IMPLICIT`` one
2520         :param bytes expl: override default tag with ``EXPLICIT`` one
2521         :param default: set default value. Type same as in ``value``
2522         :param bool optional: is object ``OPTIONAL`` in sequence
2523         """
2524         super().__init__(impl, expl, default, optional, _decoded)
2525         self._value = None if value is None else self._value_sanitize(value)
2526         if default is not None:
2527             default = self._value_sanitize(default)
2528             self.default = self.__class__(
2529                 value=default,
2530                 impl=self.tag,
2531                 expl=self._expl,
2532             )
2533             if value is None:
2534                 self._value = default
2535
2536     def _value_sanitize(self, value):
2537         if value.__class__ == bool:
2538             return value
2539         if issubclass(value.__class__, Boolean):
2540             return value._value
2541         raise InvalidValueType((self.__class__, bool))
2542
2543     @property
2544     def ready(self):
2545         return self._value is not None
2546
2547     def __getstate__(self):
2548         return BooleanState(
2549             __version__,
2550             self.tag,
2551             self._tag_order,
2552             self._expl,
2553             self.default,
2554             self.optional,
2555             self.offset,
2556             self.llen,
2557             self.vlen,
2558             self.expl_lenindef,
2559             self.lenindef,
2560             self.ber_encoded,
2561             self._value,
2562         )
2563
2564     def __setstate__(self, state):
2565         super().__setstate__(state)
2566         self._value = state.value
2567
2568     def __nonzero__(self):
2569         self._assert_ready()
2570         return self._value
2571
2572     def __bool__(self):
2573         self._assert_ready()
2574         return self._value
2575
2576     def __eq__(self, their):
2577         if their.__class__ == bool:
2578             return self._value == their
2579         if not issubclass(their.__class__, Boolean):
2580             return False
2581         return (
2582             self._value == their._value and
2583             self.tag == their.tag and
2584             self._expl == their._expl
2585         )
2586
2587     def __call__(
2588             self,
2589             value=None,
2590             impl=None,
2591             expl=None,
2592             default=None,
2593             optional=None,
2594     ):
2595         return self.__class__(
2596             value=value,
2597             impl=self.tag if impl is None else impl,
2598             expl=self._expl if expl is None else expl,
2599             default=self.default if default is None else default,
2600             optional=self.optional if optional is None else optional,
2601         )
2602
2603     def _encode(self):
2604         self._assert_ready()
2605         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2606
2607     def _encode1st(self, state):
2608         return len(self.tag) + 2, state
2609
2610     def _encode2nd(self, writer, state_iter):
2611         self._assert_ready()
2612         write_full(writer, self._encode())
2613
2614     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2615         try:
2616             t, _, lv = tag_strip(tlv)
2617         except DecodeError as err:
2618             raise err.__class__(
2619                 msg=err.msg,
2620                 klass=self.__class__,
2621                 decode_path=decode_path,
2622                 offset=offset,
2623             )
2624         if t != self.tag:
2625             raise TagMismatch(
2626                 klass=self.__class__,
2627                 decode_path=decode_path,
2628                 offset=offset,
2629             )
2630         if tag_only:
2631             yield None
2632             return
2633         try:
2634             l, _, v = len_decode(lv)
2635         except DecodeError as err:
2636             raise err.__class__(
2637                 msg=err.msg,
2638                 klass=self.__class__,
2639                 decode_path=decode_path,
2640                 offset=offset,
2641             )
2642         if l != 1:
2643             raise InvalidLength(
2644                 "Boolean's length must be equal to 1",
2645                 klass=self.__class__,
2646                 decode_path=decode_path,
2647                 offset=offset,
2648             )
2649         if l > len(v):
2650             raise NotEnoughData(
2651                 "encoded length is longer than data",
2652                 klass=self.__class__,
2653                 decode_path=decode_path,
2654                 offset=offset,
2655             )
2656         first_octet = v[0]
2657         ber_encoded = False
2658         if first_octet == 0:
2659             value = False
2660         elif first_octet == 0xFF:
2661             value = True
2662         elif ctx.get("bered", False):
2663             value = True
2664             ber_encoded = True
2665         else:
2666             raise DecodeError(
2667                 "unacceptable Boolean value",
2668                 klass=self.__class__,
2669                 decode_path=decode_path,
2670                 offset=offset,
2671             )
2672         obj = self.__class__(
2673             value=value,
2674             impl=self.tag,
2675             expl=self._expl,
2676             default=self.default,
2677             optional=self.optional,
2678             _decoded=(offset, 1, 1),
2679         )
2680         obj.ber_encoded = ber_encoded
2681         yield decode_path, obj, v[1:]
2682
2683     def __repr__(self):
2684         return pp_console_row(next(self.pps()))
2685
2686     def pps(self, decode_path=()):
2687         yield _pp(
2688             obj=self,
2689             asn1_type_name=self.asn1_type_name,
2690             obj_name=self.__class__.__name__,
2691             decode_path=decode_path,
2692             value=str(self._value) if self.ready else None,
2693             optional=self.optional,
2694             default=self == self.default,
2695             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2696             expl=None if self._expl is None else tag_decode(self._expl),
2697             offset=self.offset,
2698             tlen=self.tlen,
2699             llen=self.llen,
2700             vlen=self.vlen,
2701             expl_offset=self.expl_offset if self.expled else None,
2702             expl_tlen=self.expl_tlen if self.expled else None,
2703             expl_llen=self.expl_llen if self.expled else None,
2704             expl_vlen=self.expl_vlen if self.expled else None,
2705             expl_lenindef=self.expl_lenindef,
2706             ber_encoded=self.ber_encoded,
2707             bered=self.bered,
2708         )
2709         for pp in self.pps_lenindef(decode_path):
2710             yield pp
2711
2712
2713 IntegerState = namedtuple(
2714     "IntegerState",
2715     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2716     **NAMEDTUPLE_KWARGS
2717 )
2718
2719
2720 class Integer(Obj):
2721     """``INTEGER`` integer type
2722
2723     >>> b = Integer(-123)
2724     INTEGER -123
2725     >>> b == Integer(-123)
2726     True
2727     >>> int(b)
2728     -123
2729
2730     >>> Integer(2, bounds=(1, 3))
2731     INTEGER 2
2732     >>> Integer(5, bounds=(1, 3))
2733     Traceback (most recent call last):
2734     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2735
2736     ::
2737
2738         class Version(Integer):
2739             schema = (
2740                 ("v1", 0),
2741                 ("v2", 1),
2742                 ("v3", 2),
2743             )
2744
2745     >>> v = Version("v1")
2746     Version INTEGER v1
2747     >>> int(v)
2748     0
2749     >>> v.named
2750     'v1'
2751     >>> v.specs
2752     {'v3': 2, 'v1': 0, 'v2': 1}
2753     """
2754     __slots__ = ("specs", "_bound_min", "_bound_max")
2755     tag_default = tag_encode(2)
2756     asn1_type_name = "INTEGER"
2757
2758     def __init__(
2759             self,
2760             value=None,
2761             bounds=None,
2762             impl=None,
2763             expl=None,
2764             default=None,
2765             optional=False,
2766             _specs=None,
2767             _decoded=(0, 0, 0),
2768     ):
2769         """
2770         :param value: set the value. Either integer type, named value
2771                       (if ``schema`` is specified in the class), or
2772                       :py:class:`pyderasn.Integer` object
2773         :param bounds: set ``(MIN, MAX)`` value constraint.
2774                        (-inf, +inf) by default
2775         :param bytes impl: override default tag with ``IMPLICIT`` one
2776         :param bytes expl: override default tag with ``EXPLICIT`` one
2777         :param default: set default value. Type same as in ``value``
2778         :param bool optional: is object ``OPTIONAL`` in sequence
2779         """
2780         super().__init__(impl, expl, default, optional, _decoded)
2781         self._value = value
2782         specs = getattr(self, "schema", {}) if _specs is None else _specs
2783         self.specs = specs if specs.__class__ == dict else dict(specs)
2784         self._bound_min, self._bound_max = getattr(
2785             self,
2786             "bounds",
2787             (float("-inf"), float("+inf")),
2788         ) if bounds is None else bounds
2789         if value is not None:
2790             self._value = self._value_sanitize(value)
2791         if default is not None:
2792             default = self._value_sanitize(default)
2793             self.default = self.__class__(
2794                 value=default,
2795                 impl=self.tag,
2796                 expl=self._expl,
2797                 _specs=self.specs,
2798             )
2799             if self._value is None:
2800                 self._value = default
2801
2802     def _value_sanitize(self, value):
2803         if isinstance(value, int):
2804             pass
2805         elif issubclass(value.__class__, Integer):
2806             value = value._value
2807         elif value.__class__ == str:
2808             value = self.specs.get(value)
2809             if value is None:
2810                 raise ObjUnknown("integer value: %s" % value)
2811         else:
2812             raise InvalidValueType((self.__class__, int, str))
2813         if not self._bound_min <= value <= self._bound_max:
2814             raise BoundsError(self._bound_min, value, self._bound_max)
2815         return value
2816
2817     @property
2818     def ready(self):
2819         return self._value is not None
2820
2821     def __getstate__(self):
2822         return IntegerState(
2823             __version__,
2824             self.tag,
2825             self._tag_order,
2826             self._expl,
2827             self.default,
2828             self.optional,
2829             self.offset,
2830             self.llen,
2831             self.vlen,
2832             self.expl_lenindef,
2833             self.lenindef,
2834             self.ber_encoded,
2835             self.specs,
2836             self._value,
2837             self._bound_min,
2838             self._bound_max,
2839         )
2840
2841     def __setstate__(self, state):
2842         super().__setstate__(state)
2843         self.specs = state.specs
2844         self._value = state.value
2845         self._bound_min = state.bound_min
2846         self._bound_max = state.bound_max
2847
2848     def __int__(self):
2849         self._assert_ready()
2850         return int(self._value)
2851
2852     def tohex(self):
2853         """Hexadecimal representation
2854
2855         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2856         """
2857         hex_repr = hex(int(self))[2:].upper()
2858         if len(hex_repr) % 2 != 0:
2859             hex_repr = "0" + hex_repr
2860         return hex_repr
2861
2862     def __hash__(self):
2863         self._assert_ready()
2864         return hash(b"".join((
2865             self.tag,
2866             bytes(self._expl or b""),
2867             str(self._value).encode("ascii"),
2868         )))
2869
2870     def __eq__(self, their):
2871         if isinstance(their, int):
2872             return self._value == their
2873         if not issubclass(their.__class__, Integer):
2874             return False
2875         return (
2876             self._value == their._value and
2877             self.tag == their.tag and
2878             self._expl == their._expl
2879         )
2880
2881     def __lt__(self, their):
2882         return self._value < their._value
2883
2884     @property
2885     def named(self):
2886         """Return named representation (if exists) of the value
2887         """
2888         for name, value in self.specs.items():
2889             if value == self._value:
2890                 return name
2891         return None
2892
2893     def __call__(
2894             self,
2895             value=None,
2896             bounds=None,
2897             impl=None,
2898             expl=None,
2899             default=None,
2900             optional=None,
2901     ):
2902         return self.__class__(
2903             value=value,
2904             bounds=(
2905                 (self._bound_min, self._bound_max)
2906                 if bounds is None else bounds
2907             ),
2908             impl=self.tag if impl is None else impl,
2909             expl=self._expl if expl is None else expl,
2910             default=self.default if default is None else default,
2911             optional=self.optional if optional is None else optional,
2912             _specs=self.specs,
2913         )
2914
2915     def _encode_payload(self):
2916         self._assert_ready()
2917         value = self._value
2918         bytes_len = ceil(value.bit_length() / 8) or 1
2919         while True:
2920             try:
2921                 octets = value.to_bytes(bytes_len, byteorder="big", signed=True)
2922             except OverflowError:
2923                 bytes_len += 1
2924             else:
2925                 break
2926         return octets
2927
2928     def _encode(self):
2929         octets = self._encode_payload()
2930         return b"".join((self.tag, len_encode(len(octets)), octets))
2931
2932     def _encode1st(self, state):
2933         l = len(self._encode_payload())
2934         return len(self.tag) + len_size(l) + l, state
2935
2936     def _encode2nd(self, writer, state_iter):
2937         write_full(writer, self._encode())
2938
2939     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2940         try:
2941             t, _, lv = tag_strip(tlv)
2942         except DecodeError as err:
2943             raise err.__class__(
2944                 msg=err.msg,
2945                 klass=self.__class__,
2946                 decode_path=decode_path,
2947                 offset=offset,
2948             )
2949         if t != self.tag:
2950             raise TagMismatch(
2951                 klass=self.__class__,
2952                 decode_path=decode_path,
2953                 offset=offset,
2954             )
2955         if tag_only:
2956             yield None
2957             return
2958         try:
2959             l, llen, v = len_decode(lv)
2960         except DecodeError as err:
2961             raise err.__class__(
2962                 msg=err.msg,
2963                 klass=self.__class__,
2964                 decode_path=decode_path,
2965                 offset=offset,
2966             )
2967         if l > len(v):
2968             raise NotEnoughData(
2969                 "encoded length is longer than data",
2970                 klass=self.__class__,
2971                 decode_path=decode_path,
2972                 offset=offset,
2973             )
2974         if l == 0:
2975             raise NotEnoughData(
2976                 "zero length",
2977                 klass=self.__class__,
2978                 decode_path=decode_path,
2979                 offset=offset,
2980             )
2981         v, tail = v[:l], v[l:]
2982         first_octet = v[0]
2983         if l > 1:
2984             second_octet = v[1]
2985             if (
2986                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2987                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2988             ):
2989                 raise DecodeError(
2990                     "non normalized integer",
2991                     klass=self.__class__,
2992                     decode_path=decode_path,
2993                     offset=offset,
2994                 )
2995         value = int.from_bytes(v, byteorder="big", signed=True)
2996         try:
2997             obj = self.__class__(
2998                 value=value,
2999                 bounds=(self._bound_min, self._bound_max),
3000                 impl=self.tag,
3001                 expl=self._expl,
3002                 default=self.default,
3003                 optional=self.optional,
3004                 _specs=self.specs,
3005                 _decoded=(offset, llen, l),
3006             )
3007         except BoundsError as err:
3008             raise DecodeError(
3009                 msg=str(err),
3010                 klass=self.__class__,
3011                 decode_path=decode_path,
3012                 offset=offset,
3013             )
3014         yield decode_path, obj, tail
3015
3016     def __repr__(self):
3017         return pp_console_row(next(self.pps()))
3018
3019     def pps(self, decode_path=()):
3020         yield _pp(
3021             obj=self,
3022             asn1_type_name=self.asn1_type_name,
3023             obj_name=self.__class__.__name__,
3024             decode_path=decode_path,
3025             value=(self.named or str(self._value)) if self.ready else None,
3026             optional=self.optional,
3027             default=self == self.default,
3028             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3029             expl=None if self._expl is None else tag_decode(self._expl),
3030             offset=self.offset,
3031             tlen=self.tlen,
3032             llen=self.llen,
3033             vlen=self.vlen,
3034             expl_offset=self.expl_offset if self.expled else None,
3035             expl_tlen=self.expl_tlen if self.expled else None,
3036             expl_llen=self.expl_llen if self.expled else None,
3037             expl_vlen=self.expl_vlen if self.expled else None,
3038             expl_lenindef=self.expl_lenindef,
3039             bered=self.bered,
3040         )
3041         for pp in self.pps_lenindef(decode_path):
3042             yield pp
3043
3044
3045 BitStringState = namedtuple(
3046     "BitStringState",
3047     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3048     **NAMEDTUPLE_KWARGS
3049 )
3050
3051
3052 class BitString(Obj):
3053     """``BIT STRING`` bit string type
3054
3055     >>> BitString(b"hello world")
3056     BIT STRING 88 bits 68656c6c6f20776f726c64
3057     >>> bytes(b)
3058     b'hello world'
3059     >>> b == b"hello world"
3060     True
3061     >>> b.bit_len
3062     88
3063
3064     >>> BitString("'0A3B5F291CD'H")
3065     BIT STRING 44 bits 0a3b5f291cd0
3066     >>> b = BitString("'010110000000'B")
3067     BIT STRING 12 bits 5800
3068     >>> b.bit_len
3069     12
3070     >>> b[0], b[1], b[2], b[3]
3071     (False, True, False, True)
3072     >>> b[1000]
3073     False
3074     >>> [v for v in b]
3075     [False, True, False, True, True, False, False, False, False, False, False, False]
3076
3077     ::
3078
3079         class KeyUsage(BitString):
3080             schema = (
3081                 ("digitalSignature", 0),
3082                 ("nonRepudiation", 1),
3083                 ("keyEncipherment", 2),
3084             )
3085
3086     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3087     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3088     >>> b.named
3089     ['nonRepudiation', 'keyEncipherment']
3090     >>> b.specs
3091     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3092
3093     .. note::
3094
3095        Pay attention that BIT STRING can be encoded both in primitive
3096        and constructed forms. Decoder always checks constructed form tag
3097        additionally to specified primitive one. If BER decoding is
3098        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3099        of DER restrictions.
3100     """
3101     __slots__ = ("tag_constructed", "specs", "defined")
3102     tag_default = tag_encode(3)
3103     asn1_type_name = "BIT STRING"
3104
3105     def __init__(
3106             self,
3107             value=None,
3108             impl=None,
3109             expl=None,
3110             default=None,
3111             optional=False,
3112             _specs=None,
3113             _decoded=(0, 0, 0),
3114     ):
3115         """
3116         :param value: set the value. Either binary type, tuple of named
3117                       values (if ``schema`` is specified in the class),
3118                       string in ``'XXX...'B`` form, or
3119                       :py:class:`pyderasn.BitString` object
3120         :param bytes impl: override default tag with ``IMPLICIT`` one
3121         :param bytes expl: override default tag with ``EXPLICIT`` one
3122         :param default: set default value. Type same as in ``value``
3123         :param bool optional: is object ``OPTIONAL`` in sequence
3124         """
3125         super().__init__(impl, expl, default, optional, _decoded)
3126         specs = getattr(self, "schema", {}) if _specs is None else _specs
3127         self.specs = specs if specs.__class__ == dict else dict(specs)
3128         self._value = None if value is None else self._value_sanitize(value)
3129         if default is not None:
3130             default = self._value_sanitize(default)
3131             self.default = self.__class__(
3132                 value=default,
3133                 impl=self.tag,
3134                 expl=self._expl,
3135             )
3136             if value is None:
3137                 self._value = default
3138         self.defined = None
3139         tag_klass, _, tag_num = tag_decode(self.tag)
3140         self.tag_constructed = tag_encode(
3141             klass=tag_klass,
3142             form=TagFormConstructed,
3143             num=tag_num,
3144         )
3145
3146     def _bits2octets(self, bits):
3147         if len(self.specs) > 0:
3148             bits = bits.rstrip("0")
3149         bit_len = len(bits)
3150         bits += "0" * ((8 - (bit_len % 8)) % 8)
3151         octets = bytearray(len(bits) // 8)
3152         for i in range(len(octets)):
3153             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3154         return bit_len, bytes(octets)
3155
3156     def _value_sanitize(self, value):
3157         if isinstance(value, (str, bytes)):
3158             if (
3159                     isinstance(value, str) and
3160                     value.startswith("'")
3161             ):
3162                 if value.endswith("'B"):
3163                     value = value[1:-2]
3164                     if not frozenset(value) <= SET01:
3165                         raise ValueError("B's coding contains unacceptable chars")
3166                     return self._bits2octets(value)
3167                 if value.endswith("'H"):
3168                     value = value[1:-2]
3169                     return (
3170                         len(value) * 4,
3171                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3172                     )
3173             if value.__class__ == bytes:
3174                 return (len(value) * 8, value)
3175             raise InvalidValueType((self.__class__, str, bytes))
3176         if value.__class__ == tuple:
3177             if (
3178                     len(value) == 2 and
3179                     isinstance(value[0], int) and
3180                     value[1].__class__ == bytes
3181             ):
3182                 return value
3183             bits = []
3184             for name in value:
3185                 bit = self.specs.get(name)
3186                 if bit is None:
3187                     raise ObjUnknown("BitString value: %s" % name)
3188                 bits.append(bit)
3189             if len(bits) == 0:
3190                 return self._bits2octets("")
3191             bits = frozenset(bits)
3192             return self._bits2octets("".join(
3193                 ("1" if bit in bits else "0")
3194                 for bit in range(max(bits) + 1)
3195             ))
3196         if issubclass(value.__class__, BitString):
3197             return value._value
3198         raise InvalidValueType((self.__class__, bytes, str))
3199
3200     @property
3201     def ready(self):
3202         return self._value is not None
3203
3204     def __getstate__(self):
3205         return BitStringState(
3206             __version__,
3207             self.tag,
3208             self._tag_order,
3209             self._expl,
3210             self.default,
3211             self.optional,
3212             self.offset,
3213             self.llen,
3214             self.vlen,
3215             self.expl_lenindef,
3216             self.lenindef,
3217             self.ber_encoded,
3218             self.specs,
3219             self._value,
3220             self.tag_constructed,
3221             self.defined,
3222         )
3223
3224     def __setstate__(self, state):
3225         super().__setstate__(state)
3226         self.specs = state.specs
3227         self._value = state.value
3228         self.tag_constructed = state.tag_constructed
3229         self.defined = state.defined
3230
3231     def __iter__(self):
3232         self._assert_ready()
3233         for i in range(self._value[0]):
3234             yield self[i]
3235
3236     @property
3237     def bit_len(self):
3238         """Returns number of bits in the string
3239         """
3240         self._assert_ready()
3241         return self._value[0]
3242
3243     def __bytes__(self):
3244         self._assert_ready()
3245         return self._value[1]
3246
3247     def __eq__(self, their):
3248         if their.__class__ == bytes:
3249             return self._value[1] == their
3250         if not issubclass(their.__class__, BitString):
3251             return False
3252         return (
3253             self._value == their._value and
3254             self.tag == their.tag and
3255             self._expl == their._expl
3256         )
3257
3258     @property
3259     def named(self):
3260         """Named representation (if exists) of the bits
3261
3262         :returns: [str(name), ...]
3263         """
3264         return [name for name, bit in self.specs.items() if self[bit]]
3265
3266     def __call__(
3267             self,
3268             value=None,
3269             impl=None,
3270             expl=None,
3271             default=None,
3272             optional=None,
3273     ):
3274         return self.__class__(
3275             value=value,
3276             impl=self.tag if impl is None else impl,
3277             expl=self._expl if expl is None else expl,
3278             default=self.default if default is None else default,
3279             optional=self.optional if optional is None else optional,
3280             _specs=self.specs,
3281         )
3282
3283     def __getitem__(self, key):
3284         if key.__class__ == int:
3285             bit_len, octets = self._value
3286             if key >= bit_len:
3287                 return False
3288             return memoryview(octets)[key // 8] >> (7 - (key % 8)) & 1 == 1
3289         if isinstance(key, str):
3290             value = self.specs.get(key)
3291             if value is None:
3292                 raise ObjUnknown("BitString value: %s" % key)
3293             return self[value]
3294         raise InvalidValueType((int, str))
3295
3296     def _encode(self):
3297         self._assert_ready()
3298         bit_len, octets = self._value
3299         return b"".join((
3300             self.tag,
3301             len_encode(len(octets) + 1),
3302             int2byte((8 - bit_len % 8) % 8),
3303             octets,
3304         ))
3305
3306     def _encode1st(self, state):
3307         self._assert_ready()
3308         _, octets = self._value
3309         l = len(octets) + 1
3310         return len(self.tag) + len_size(l) + l, state
3311
3312     def _encode2nd(self, writer, state_iter):
3313         bit_len, octets = self._value
3314         write_full(writer, b"".join((
3315             self.tag,
3316             len_encode(len(octets) + 1),
3317             int2byte((8 - bit_len % 8) % 8),
3318         )))
3319         write_full(writer, octets)
3320
3321     def _encode_cer(self, writer):
3322         bit_len, octets = self._value
3323         if len(octets) + 1 <= 1000:
3324             write_full(writer, self._encode())
3325             return
3326         write_full(writer, self.tag_constructed)
3327         write_full(writer, LENINDEF)
3328         for offset in range(0, (len(octets) // 999) * 999, 999):
3329             write_full(writer, b"".join((
3330                 BitString.tag_default,
3331                 LEN1K,
3332                 b"\x00",
3333                 octets[offset:offset + 999],
3334             )))
3335         tail = octets[offset + 999:]
3336         if len(tail) > 0:
3337             tail = int2byte((8 - bit_len % 8) % 8) + tail
3338             write_full(writer, b"".join((
3339                 BitString.tag_default,
3340                 len_encode(len(tail)),
3341                 tail,
3342             )))
3343         write_full(writer, EOC)
3344
3345     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3346         try:
3347             t, tlen, lv = tag_strip(tlv)
3348         except DecodeError as err:
3349             raise err.__class__(
3350                 msg=err.msg,
3351                 klass=self.__class__,
3352                 decode_path=decode_path,
3353                 offset=offset,
3354             )
3355         if t == self.tag:
3356             if tag_only:  # pragma: no cover
3357                 yield None
3358                 return
3359             try:
3360                 l, llen, v = len_decode(lv)
3361             except DecodeError as err:
3362                 raise err.__class__(
3363                     msg=err.msg,
3364                     klass=self.__class__,
3365                     decode_path=decode_path,
3366                     offset=offset,
3367                 )
3368             if l > len(v):
3369                 raise NotEnoughData(
3370                     "encoded length is longer than data",
3371                     klass=self.__class__,
3372                     decode_path=decode_path,
3373                     offset=offset,
3374                 )
3375             if l == 0:
3376                 raise NotEnoughData(
3377                     "zero length",
3378                     klass=self.__class__,
3379                     decode_path=decode_path,
3380                     offset=offset,
3381                 )
3382             pad_size = v[0]
3383             if l == 1 and pad_size != 0:
3384                 raise DecodeError(
3385                     "invalid empty value",
3386                     klass=self.__class__,
3387                     decode_path=decode_path,
3388                     offset=offset,
3389                 )
3390             if pad_size > 7:
3391                 raise DecodeError(
3392                     "too big pad",
3393                     klass=self.__class__,
3394                     decode_path=decode_path,
3395                     offset=offset,
3396                 )
3397             if v[l - 1] & ((1 << pad_size) - 1) != 0:
3398                 raise DecodeError(
3399                     "invalid pad",
3400                     klass=self.__class__,
3401                     decode_path=decode_path,
3402                     offset=offset,
3403                 )
3404             v, tail = v[:l], v[l:]
3405             bit_len = (len(v) - 1) * 8 - pad_size
3406             obj = self.__class__(
3407                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3408                 impl=self.tag,
3409                 expl=self._expl,
3410                 default=self.default,
3411                 optional=self.optional,
3412                 _specs=self.specs,
3413                 _decoded=(offset, llen, l),
3414             )
3415             if evgen_mode:
3416                 obj._value = (bit_len, None)
3417             yield decode_path, obj, tail
3418             return
3419         if t != self.tag_constructed:
3420             raise TagMismatch(
3421                 klass=self.__class__,
3422                 decode_path=decode_path,
3423                 offset=offset,
3424             )
3425         if not ctx.get("bered", False):
3426             raise DecodeError(
3427                 "unallowed BER constructed encoding",
3428                 klass=self.__class__,
3429                 decode_path=decode_path,
3430                 offset=offset,
3431             )
3432         if tag_only:  # pragma: no cover
3433             yield None
3434             return
3435         lenindef = False
3436         try:
3437             l, llen, v = len_decode(lv)
3438         except LenIndefForm:
3439             llen, l, v = 1, 0, lv[1:]
3440             lenindef = True
3441         except DecodeError as err:
3442             raise err.__class__(
3443                 msg=err.msg,
3444                 klass=self.__class__,
3445                 decode_path=decode_path,
3446                 offset=offset,
3447             )
3448         if l > len(v):
3449             raise NotEnoughData(
3450                 "encoded length is longer than data",
3451                 klass=self.__class__,
3452                 decode_path=decode_path,
3453                 offset=offset,
3454             )
3455         if not lenindef and l == 0:
3456             raise NotEnoughData(
3457                 "zero length",
3458                 klass=self.__class__,
3459                 decode_path=decode_path,
3460                 offset=offset,
3461             )
3462         chunks = []
3463         sub_offset = offset + tlen + llen
3464         vlen = 0
3465         while True:
3466             if lenindef:
3467                 if v[:EOC_LEN].tobytes() == EOC:
3468                     break
3469             else:
3470                 if vlen == l:
3471                     break
3472                 if vlen > l:
3473                     raise DecodeError(
3474                         "chunk out of bounds",
3475                         klass=self.__class__,
3476                         decode_path=decode_path + (str(len(chunks) - 1),),
3477                         offset=chunks[-1].offset,
3478                     )
3479             sub_decode_path = decode_path + (str(len(chunks)),)
3480             try:
3481                 if evgen_mode:
3482                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3483                             v,
3484                             offset=sub_offset,
3485                             decode_path=sub_decode_path,
3486                             leavemm=True,
3487                             ctx=ctx,
3488                             _ctx_immutable=False,
3489                     ):
3490                         yield _decode_path, chunk, v_tail
3491                 else:
3492                     _, chunk, v_tail = next(BitString().decode_evgen(
3493                         v,
3494                         offset=sub_offset,
3495                         decode_path=sub_decode_path,
3496                         leavemm=True,
3497                         ctx=ctx,
3498                         _ctx_immutable=False,
3499                         _evgen_mode=False,
3500                     ))
3501             except TagMismatch:
3502                 raise DecodeError(
3503                     "expected BitString encoded chunk",
3504                     klass=self.__class__,
3505                     decode_path=sub_decode_path,
3506                     offset=sub_offset,
3507                 )
3508             chunks.append(chunk)
3509             sub_offset += chunk.tlvlen
3510             vlen += chunk.tlvlen
3511             v = v_tail
3512         if len(chunks) == 0:
3513             raise DecodeError(
3514                 "no chunks",
3515                 klass=self.__class__,
3516                 decode_path=decode_path,
3517                 offset=offset,
3518             )
3519         values = []
3520         bit_len = 0
3521         for chunk_i, chunk in enumerate(chunks[:-1]):
3522             if chunk.bit_len % 8 != 0:
3523                 raise DecodeError(
3524                     "BitString chunk is not multiple of 8 bits",
3525                     klass=self.__class__,
3526                     decode_path=decode_path + (str(chunk_i),),
3527                     offset=chunk.offset,
3528                 )
3529             if not evgen_mode:
3530                 values.append(bytes(chunk))
3531             bit_len += chunk.bit_len
3532         chunk_last = chunks[-1]
3533         if not evgen_mode:
3534             values.append(bytes(chunk_last))
3535         bit_len += chunk_last.bit_len
3536         obj = self.__class__(
3537             value=None if evgen_mode else (bit_len, b"".join(values)),
3538             impl=self.tag,
3539             expl=self._expl,
3540             default=self.default,
3541             optional=self.optional,
3542             _specs=self.specs,
3543             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3544         )
3545         if evgen_mode:
3546             obj._value = (bit_len, None)
3547         obj.lenindef = lenindef
3548         obj.ber_encoded = True
3549         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3550
3551     def __repr__(self):
3552         return pp_console_row(next(self.pps()))
3553
3554     def pps(self, decode_path=()):
3555         value = None
3556         blob = None
3557         if self.ready:
3558             bit_len, blob = self._value
3559             value = "%d bits" % bit_len
3560             if len(self.specs) > 0 and blob is not None:
3561                 blob = tuple(self.named)
3562         yield _pp(
3563             obj=self,
3564             asn1_type_name=self.asn1_type_name,
3565             obj_name=self.__class__.__name__,
3566             decode_path=decode_path,
3567             value=value,
3568             blob=blob,
3569             optional=self.optional,
3570             default=self == self.default,
3571             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3572             expl=None if self._expl is None else tag_decode(self._expl),
3573             offset=self.offset,
3574             tlen=self.tlen,
3575             llen=self.llen,
3576             vlen=self.vlen,
3577             expl_offset=self.expl_offset if self.expled else None,
3578             expl_tlen=self.expl_tlen if self.expled else None,
3579             expl_llen=self.expl_llen if self.expled else None,
3580             expl_vlen=self.expl_vlen if self.expled else None,
3581             expl_lenindef=self.expl_lenindef,
3582             lenindef=self.lenindef,
3583             ber_encoded=self.ber_encoded,
3584             bered=self.bered,
3585         )
3586         defined_by, defined = self.defined or (None, None)
3587         if defined_by is not None:
3588             yield defined.pps(
3589                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3590             )
3591         for pp in self.pps_lenindef(decode_path):
3592             yield pp
3593
3594
3595 OctetStringState = namedtuple(
3596     "OctetStringState",
3597     BasicState._fields + (
3598         "value",
3599         "bound_min",
3600         "bound_max",
3601         "tag_constructed",
3602         "defined",
3603     ),
3604     **NAMEDTUPLE_KWARGS
3605 )
3606
3607
3608 class OctetString(Obj):
3609     """``OCTET STRING`` binary string type
3610
3611     >>> s = OctetString(b"hello world")
3612     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3613     >>> s == OctetString(b"hello world")
3614     True
3615     >>> bytes(s)
3616     b'hello world'
3617
3618     >>> OctetString(b"hello", bounds=(4, 4))
3619     Traceback (most recent call last):
3620     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3621     >>> OctetString(b"hell", bounds=(4, 4))
3622     OCTET STRING 4 bytes 68656c6c
3623
3624     Memoryviews can be used as a values. If memoryview is made on
3625     mmap-ed file, then it does not take storage inside OctetString
3626     itself. In CER encoding mode it will be streamed to the specified
3627     writer, copying 1 KB chunks.
3628     """
3629     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3630     tag_default = tag_encode(4)
3631     asn1_type_name = "OCTET STRING"
3632     evgen_mode_skip_value = True
3633
3634     def __init__(
3635             self,
3636             value=None,
3637             bounds=None,
3638             impl=None,
3639             expl=None,
3640             default=None,
3641             optional=False,
3642             _decoded=(0, 0, 0),
3643             ctx=None,
3644     ):
3645         """
3646         :param value: set the value. Either binary type, or
3647                       :py:class:`pyderasn.OctetString` object
3648         :param bounds: set ``(MIN, MAX)`` value size constraint.
3649                        (-inf, +inf) by default
3650         :param bytes impl: override default tag with ``IMPLICIT`` one
3651         :param bytes expl: override default tag with ``EXPLICIT`` one
3652         :param default: set default value. Type same as in ``value``
3653         :param bool optional: is object ``OPTIONAL`` in sequence
3654         """
3655         super().__init__(impl, expl, default, optional, _decoded)
3656         self._value = value
3657         self._bound_min, self._bound_max = getattr(
3658             self,
3659             "bounds",
3660             (0, float("+inf")),
3661         ) if bounds is None else bounds
3662         if value is not None:
3663             self._value = self._value_sanitize(value)
3664         if default is not None:
3665             default = self._value_sanitize(default)
3666             self.default = self.__class__(
3667                 value=default,
3668                 impl=self.tag,
3669                 expl=self._expl,
3670             )
3671             if self._value is None:
3672                 self._value = default
3673         self.defined = None
3674         tag_klass, _, tag_num = tag_decode(self.tag)
3675         self.tag_constructed = tag_encode(
3676             klass=tag_klass,
3677             form=TagFormConstructed,
3678             num=tag_num,
3679         )
3680
3681     def _value_sanitize(self, value):
3682         if value.__class__ == bytes or value.__class__ == memoryview:
3683             pass
3684         elif issubclass(value.__class__, OctetString):
3685             value = value._value
3686         else:
3687             raise InvalidValueType((self.__class__, bytes, memoryview))
3688         if not self._bound_min <= len(value) <= self._bound_max:
3689             raise BoundsError(self._bound_min, len(value), self._bound_max)
3690         return value
3691
3692     @property
3693     def ready(self):
3694         return self._value is not None
3695
3696     def __getstate__(self):
3697         return OctetStringState(
3698             __version__,
3699             self.tag,
3700             self._tag_order,
3701             self._expl,
3702             self.default,
3703             self.optional,
3704             self.offset,
3705             self.llen,
3706             self.vlen,
3707             self.expl_lenindef,
3708             self.lenindef,
3709             self.ber_encoded,
3710             self._value,
3711             self._bound_min,
3712             self._bound_max,
3713             self.tag_constructed,
3714             self.defined,
3715         )
3716
3717     def __setstate__(self, state):
3718         super().__setstate__(state)
3719         self._value = state.value
3720         self._bound_min = state.bound_min
3721         self._bound_max = state.bound_max
3722         self.tag_constructed = state.tag_constructed
3723         self.defined = state.defined
3724
3725     def __bytes__(self):
3726         self._assert_ready()
3727         return bytes(self._value)
3728
3729     def __eq__(self, their):
3730         if their.__class__ == bytes:
3731             return self._value == their
3732         if not issubclass(their.__class__, OctetString):
3733             return False
3734         return (
3735             self._value == their._value and
3736             self.tag == their.tag and
3737             self._expl == their._expl
3738         )
3739
3740     def __lt__(self, their):
3741         return self._value < their._value
3742
3743     def __call__(
3744             self,
3745             value=None,
3746             bounds=None,
3747             impl=None,
3748             expl=None,
3749             default=None,
3750             optional=None,
3751     ):
3752         return self.__class__(
3753             value=value,
3754             bounds=(
3755                 (self._bound_min, self._bound_max)
3756                 if bounds is None else bounds
3757             ),
3758             impl=self.tag if impl is None else impl,
3759             expl=self._expl if expl is None else expl,
3760             default=self.default if default is None else default,
3761             optional=self.optional if optional is None else optional,
3762         )
3763
3764     def _encode(self):
3765         self._assert_ready()
3766         return b"".join((
3767             self.tag,
3768             len_encode(len(self._value)),
3769             self._value,
3770         ))
3771
3772     def _encode1st(self, state):
3773         self._assert_ready()
3774         l = len(self._value)
3775         return len(self.tag) + len_size(l) + l, state
3776
3777     def _encode2nd(self, writer, state_iter):
3778         value = self._value
3779         write_full(writer, self.tag + len_encode(len(value)))
3780         write_full(writer, value)
3781
3782     def _encode_cer(self, writer):
3783         octets = self._value
3784         if len(octets) <= 1000:
3785             write_full(writer, self._encode())
3786             return
3787         write_full(writer, self.tag_constructed)
3788         write_full(writer, LENINDEF)
3789         for offset in range(0, (len(octets) // 1000) * 1000, 1000):
3790             write_full(writer, b"".join((
3791                 OctetString.tag_default,
3792                 LEN1K,
3793                 octets[offset:offset + 1000],
3794             )))
3795         tail = octets[offset + 1000:]
3796         if len(tail) > 0:
3797             write_full(writer, b"".join((
3798                 OctetString.tag_default,
3799                 len_encode(len(tail)),
3800                 tail,
3801             )))
3802         write_full(writer, EOC)
3803
3804     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3805         try:
3806             t, tlen, lv = tag_strip(tlv)
3807         except DecodeError as err:
3808             raise err.__class__(
3809                 msg=err.msg,
3810                 klass=self.__class__,
3811                 decode_path=decode_path,
3812                 offset=offset,
3813             )
3814         if t == self.tag:
3815             if tag_only:
3816                 yield None
3817                 return
3818             try:
3819                 l, llen, v = len_decode(lv)
3820             except DecodeError as err:
3821                 raise err.__class__(
3822                     msg=err.msg,
3823                     klass=self.__class__,
3824                     decode_path=decode_path,
3825                     offset=offset,
3826                 )
3827             if l > len(v):
3828                 raise NotEnoughData(
3829                     "encoded length is longer than data",
3830                     klass=self.__class__,
3831                     decode_path=decode_path,
3832                     offset=offset,
3833                 )
3834             v, tail = v[:l], v[l:]
3835             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3836                 raise DecodeError(
3837                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3838                     klass=self.__class__,
3839                     decode_path=decode_path,
3840                     offset=offset,
3841                 )
3842             try:
3843                 obj = self.__class__(
3844                     value=(
3845                         None if (evgen_mode and self.evgen_mode_skip_value)
3846                         else v.tobytes()
3847                     ),
3848                     bounds=(self._bound_min, self._bound_max),
3849                     impl=self.tag,
3850                     expl=self._expl,
3851                     default=self.default,
3852                     optional=self.optional,
3853                     _decoded=(offset, llen, l),
3854                     ctx=ctx,
3855                 )
3856             except DecodeError as err:
3857                 raise DecodeError(
3858                     msg=err.msg,
3859                     klass=self.__class__,
3860                     decode_path=decode_path,
3861                     offset=offset,
3862                 )
3863             except BoundsError as err:
3864                 raise DecodeError(
3865                     msg=str(err),
3866                     klass=self.__class__,
3867                     decode_path=decode_path,
3868                     offset=offset,
3869                 )
3870             yield decode_path, obj, tail
3871             return
3872         if t != self.tag_constructed:
3873             raise TagMismatch(
3874                 klass=self.__class__,
3875                 decode_path=decode_path,
3876                 offset=offset,
3877             )
3878         if not ctx.get("bered", False):
3879             raise DecodeError(
3880                 "unallowed BER constructed encoding",
3881                 klass=self.__class__,
3882                 decode_path=decode_path,
3883                 offset=offset,
3884             )
3885         if tag_only:
3886             yield None
3887             return
3888         lenindef = False
3889         try:
3890             l, llen, v = len_decode(lv)
3891         except LenIndefForm:
3892             llen, l, v = 1, 0, lv[1:]
3893             lenindef = True
3894         except DecodeError as err:
3895             raise err.__class__(
3896                 msg=err.msg,
3897                 klass=self.__class__,
3898                 decode_path=decode_path,
3899                 offset=offset,
3900             )
3901         if l > len(v):
3902             raise NotEnoughData(
3903                 "encoded length is longer than data",
3904                 klass=self.__class__,
3905                 decode_path=decode_path,
3906                 offset=offset,
3907             )
3908         chunks = []
3909         chunks_count = 0
3910         sub_offset = offset + tlen + llen
3911         vlen = 0
3912         payload_len = 0
3913         while True:
3914             if lenindef:
3915                 if v[:EOC_LEN].tobytes() == EOC:
3916                     break
3917             else:
3918                 if vlen == l:
3919                     break
3920                 if vlen > l:
3921                     raise DecodeError(
3922                         "chunk out of bounds",
3923                         klass=self.__class__,
3924                         decode_path=decode_path + (str(len(chunks) - 1),),
3925                         offset=chunks[-1].offset,
3926                     )
3927             try:
3928                 if evgen_mode:
3929                     sub_decode_path = decode_path + (str(chunks_count),)
3930                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3931                             v,
3932                             offset=sub_offset,
3933                             decode_path=sub_decode_path,
3934                             leavemm=True,
3935                             ctx=ctx,
3936                             _ctx_immutable=False,
3937                     ):
3938                         yield _decode_path, chunk, v_tail
3939                         if not chunk.ber_encoded:
3940                             payload_len += chunk.vlen
3941                     chunks_count += 1
3942                 else:
3943                     sub_decode_path = decode_path + (str(len(chunks)),)
3944                     _, chunk, v_tail = next(OctetString().decode_evgen(
3945                         v,
3946                         offset=sub_offset,
3947                         decode_path=sub_decode_path,
3948                         leavemm=True,
3949                         ctx=ctx,
3950                         _ctx_immutable=False,
3951                         _evgen_mode=False,
3952                     ))
3953                     chunks.append(chunk)
3954             except TagMismatch:
3955                 raise DecodeError(
3956                     "expected OctetString encoded chunk",
3957                     klass=self.__class__,
3958                     decode_path=sub_decode_path,
3959                     offset=sub_offset,
3960                 )
3961             sub_offset += chunk.tlvlen
3962             vlen += chunk.tlvlen
3963             v = v_tail
3964         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3965             raise DecodeError(
3966                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3967                 klass=self.__class__,
3968                 decode_path=decode_path,
3969                 offset=offset,
3970             )
3971         try:
3972             obj = self.__class__(
3973                 value=(
3974                     None if evgen_mode else
3975                     b"".join(bytes(chunk) for chunk in chunks)
3976                 ),
3977                 bounds=(self._bound_min, self._bound_max),
3978                 impl=self.tag,
3979                 expl=self._expl,
3980                 default=self.default,
3981                 optional=self.optional,
3982                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3983                 ctx=ctx,
3984             )
3985         except DecodeError as err:
3986             raise DecodeError(
3987                 msg=err.msg,
3988                 klass=self.__class__,
3989                 decode_path=decode_path,
3990                 offset=offset,
3991             )
3992         except BoundsError as err:
3993             raise DecodeError(
3994                 msg=str(err),
3995                 klass=self.__class__,
3996                 decode_path=decode_path,
3997                 offset=offset,
3998             )
3999         obj.lenindef = lenindef
4000         obj.ber_encoded = True
4001         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4002
4003     def __repr__(self):
4004         return pp_console_row(next(self.pps()))
4005
4006     def pps(self, decode_path=()):
4007         yield _pp(
4008             obj=self,
4009             asn1_type_name=self.asn1_type_name,
4010             obj_name=self.__class__.__name__,
4011             decode_path=decode_path,
4012             value=("%d bytes" % len(self._value)) if self.ready else None,
4013             blob=self._value if self.ready else None,
4014             optional=self.optional,
4015             default=self == self.default,
4016             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4017             expl=None if self._expl is None else tag_decode(self._expl),
4018             offset=self.offset,
4019             tlen=self.tlen,
4020             llen=self.llen,
4021             vlen=self.vlen,
4022             expl_offset=self.expl_offset if self.expled else None,
4023             expl_tlen=self.expl_tlen if self.expled else None,
4024             expl_llen=self.expl_llen if self.expled else None,
4025             expl_vlen=self.expl_vlen if self.expled else None,
4026             expl_lenindef=self.expl_lenindef,
4027             lenindef=self.lenindef,
4028             ber_encoded=self.ber_encoded,
4029             bered=self.bered,
4030         )
4031         defined_by, defined = self.defined or (None, None)
4032         if defined_by is not None:
4033             yield defined.pps(
4034                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4035             )
4036         for pp in self.pps_lenindef(decode_path):
4037             yield pp
4038
4039
4040 def agg_octet_string(evgens, decode_path, raw, writer):
4041     """Aggregate constructed string (OctetString and its derivatives)
4042
4043     :param evgens: iterator of generated events
4044     :param decode_path: points to the string we want to decode
4045     :param raw: slicebable (memoryview, bytearray, etc) with
4046                 the data evgens are generated on
4047     :param writer: buffer.write where string is going to be saved
4048     :param writer: where string is going to be saved. Must comply
4049                    with ``io.RawIOBase.write`` behaviour
4050
4051     .. seealso:: :ref:`agg_octet_string`
4052     """
4053     decode_path_len = len(decode_path)
4054     for dp, obj, _ in evgens:
4055         if dp[:decode_path_len] != decode_path:
4056             continue
4057         if not obj.ber_encoded:
4058             write_full(writer, raw[
4059                 obj.offset + obj.tlen + obj.llen:
4060                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4061                 (EOC_LEN if obj.expl_lenindef else 0)
4062             ])
4063         if len(dp) == decode_path_len:
4064             break
4065
4066
4067 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4068
4069
4070 class Null(Obj):
4071     """``NULL`` null object
4072
4073     >>> n = Null()
4074     NULL
4075     >>> n.ready
4076     True
4077     """
4078     __slots__ = ()
4079     tag_default = tag_encode(5)
4080     asn1_type_name = "NULL"
4081
4082     def __init__(
4083             self,
4084             value=None,  # unused, but Sequence passes it
4085             impl=None,
4086             expl=None,
4087             optional=False,
4088             _decoded=(0, 0, 0),
4089     ):
4090         """
4091         :param bytes impl: override default tag with ``IMPLICIT`` one
4092         :param bytes expl: override default tag with ``EXPLICIT`` one
4093         :param bool optional: is object ``OPTIONAL`` in sequence
4094         """
4095         super().__init__(impl, expl, None, optional, _decoded)
4096         self.default = None
4097
4098     @property
4099     def ready(self):
4100         return True
4101
4102     def __getstate__(self):
4103         return NullState(
4104             __version__,
4105             self.tag,
4106             self._tag_order,
4107             self._expl,
4108             self.default,
4109             self.optional,
4110             self.offset,
4111             self.llen,
4112             self.vlen,
4113             self.expl_lenindef,
4114             self.lenindef,
4115             self.ber_encoded,
4116         )
4117
4118     def __eq__(self, their):
4119         if not issubclass(their.__class__, Null):
4120             return False
4121         return (
4122             self.tag == their.tag and
4123             self._expl == their._expl
4124         )
4125
4126     def __call__(
4127             self,
4128             value=None,
4129             impl=None,
4130             expl=None,
4131             optional=None,
4132     ):
4133         return self.__class__(
4134             impl=self.tag if impl is None else impl,
4135             expl=self._expl if expl is None else expl,
4136             optional=self.optional if optional is None else optional,
4137         )
4138
4139     def _encode(self):
4140         return self.tag + LEN0
4141
4142     def _encode1st(self, state):
4143         return len(self.tag) + 1, state
4144
4145     def _encode2nd(self, writer, state_iter):
4146         write_full(writer, self.tag + LEN0)
4147
4148     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4149         try:
4150             t, _, lv = tag_strip(tlv)
4151         except DecodeError as err:
4152             raise err.__class__(
4153                 msg=err.msg,
4154                 klass=self.__class__,
4155                 decode_path=decode_path,
4156                 offset=offset,
4157             )
4158         if t != self.tag:
4159             raise TagMismatch(
4160                 klass=self.__class__,
4161                 decode_path=decode_path,
4162                 offset=offset,
4163             )
4164         if tag_only:  # pragma: no cover
4165             yield None
4166             return
4167         try:
4168             l, _, v = len_decode(lv)
4169         except DecodeError as err:
4170             raise err.__class__(
4171                 msg=err.msg,
4172                 klass=self.__class__,
4173                 decode_path=decode_path,
4174                 offset=offset,
4175             )
4176         if l != 0:
4177             raise InvalidLength(
4178                 "Null must have zero length",
4179                 klass=self.__class__,
4180                 decode_path=decode_path,
4181                 offset=offset,
4182             )
4183         obj = self.__class__(
4184             impl=self.tag,
4185             expl=self._expl,
4186             optional=self.optional,
4187             _decoded=(offset, 1, 0),
4188         )
4189         yield decode_path, obj, v
4190
4191     def __repr__(self):
4192         return pp_console_row(next(self.pps()))
4193
4194     def pps(self, decode_path=()):
4195         yield _pp(
4196             obj=self,
4197             asn1_type_name=self.asn1_type_name,
4198             obj_name=self.__class__.__name__,
4199             decode_path=decode_path,
4200             optional=self.optional,
4201             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4202             expl=None if self._expl is None else tag_decode(self._expl),
4203             offset=self.offset,
4204             tlen=self.tlen,
4205             llen=self.llen,
4206             vlen=self.vlen,
4207             expl_offset=self.expl_offset if self.expled else None,
4208             expl_tlen=self.expl_tlen if self.expled else None,
4209             expl_llen=self.expl_llen if self.expled else None,
4210             expl_vlen=self.expl_vlen if self.expled else None,
4211             expl_lenindef=self.expl_lenindef,
4212             bered=self.bered,
4213         )
4214         for pp in self.pps_lenindef(decode_path):
4215             yield pp
4216
4217
4218 ObjectIdentifierState = namedtuple(
4219     "ObjectIdentifierState",
4220     BasicState._fields + ("value", "defines"),
4221     **NAMEDTUPLE_KWARGS
4222 )
4223
4224
4225 class ObjectIdentifier(Obj):
4226     """``OBJECT IDENTIFIER`` OID type
4227
4228     >>> oid = ObjectIdentifier((1, 2, 3))
4229     OBJECT IDENTIFIER 1.2.3
4230     >>> oid == ObjectIdentifier("1.2.3")
4231     True
4232     >>> tuple(oid)
4233     (1, 2, 3)
4234     >>> str(oid)
4235     '1.2.3'
4236     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4237     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4238
4239     >>> str(ObjectIdentifier((3, 1)))
4240     Traceback (most recent call last):
4241     pyderasn.InvalidOID: unacceptable first arc value
4242     """
4243     __slots__ = ("defines",)
4244     tag_default = tag_encode(6)
4245     asn1_type_name = "OBJECT IDENTIFIER"
4246
4247     def __init__(
4248             self,
4249             value=None,
4250             defines=(),
4251             impl=None,
4252             expl=None,
4253             default=None,
4254             optional=False,
4255             _decoded=(0, 0, 0),
4256     ):
4257         """
4258         :param value: set the value. Either tuples of integers,
4259                       string of "."-concatenated integers, or
4260                       :py:class:`pyderasn.ObjectIdentifier` object
4261         :param defines: sequence of tuples. Each tuple has two elements.
4262                         First one is relative to current one decode
4263                         path, aiming to the field defined by that OID.
4264                         Read about relative path in
4265                         :py:func:`pyderasn.abs_decode_path`. Second
4266                         tuple element is ``{OID: pyderasn.Obj()}``
4267                         dictionary, mapping between current OID value
4268                         and structure applied to defined field.
4269
4270                         .. seealso:: :ref:`definedby`
4271
4272         :param bytes impl: override default tag with ``IMPLICIT`` one
4273         :param bytes expl: override default tag with ``EXPLICIT`` one
4274         :param default: set default value. Type same as in ``value``
4275         :param bool optional: is object ``OPTIONAL`` in sequence
4276         """
4277         super().__init__(impl, expl, default, optional, _decoded)
4278         self._value = value
4279         if value is not None:
4280             self._value = self._value_sanitize(value)
4281         if default is not None:
4282             default = self._value_sanitize(default)
4283             self.default = self.__class__(
4284                 value=default,
4285                 impl=self.tag,
4286                 expl=self._expl,
4287             )
4288             if self._value is None:
4289                 self._value = default
4290         self.defines = defines
4291
4292     def __add__(self, their):
4293         if their.__class__ == tuple:
4294             return self.__class__(self._value + array("L", their))
4295         if isinstance(their, self.__class__):
4296             return self.__class__(self._value + their._value)
4297         raise InvalidValueType((self.__class__, tuple))
4298
4299     def _value_sanitize(self, value):
4300         if issubclass(value.__class__, ObjectIdentifier):
4301             return value._value
4302         if isinstance(value, str):
4303             try:
4304                 value = array("L", (pureint(arc) for arc in value.split(".")))
4305             except ValueError:
4306                 raise InvalidOID("unacceptable arcs values")
4307         if value.__class__ == tuple:
4308             try:
4309                 value = array("L", value)
4310             except OverflowError as err:
4311                 raise InvalidOID(repr(err))
4312         if value.__class__ is array:
4313             if len(value) < 2:
4314                 raise InvalidOID("less than 2 arcs")
4315             first_arc = value[0]
4316             if first_arc in (0, 1):
4317                 if not (0 <= value[1] <= 39):
4318                     raise InvalidOID("second arc is too wide")
4319             elif first_arc == 2:
4320                 pass
4321             else:
4322                 raise InvalidOID("unacceptable first arc value")
4323             if not all(arc >= 0 for arc in value):
4324                 raise InvalidOID("negative arc value")
4325             return value
4326         raise InvalidValueType((self.__class__, str, tuple))
4327
4328     @property
4329     def ready(self):
4330         return self._value is not None
4331
4332     def __getstate__(self):
4333         return ObjectIdentifierState(
4334             __version__,
4335             self.tag,
4336             self._tag_order,
4337             self._expl,
4338             self.default,
4339             self.optional,
4340             self.offset,
4341             self.llen,
4342             self.vlen,
4343             self.expl_lenindef,
4344             self.lenindef,
4345             self.ber_encoded,
4346             self._value,
4347             self.defines,
4348         )
4349
4350     def __setstate__(self, state):
4351         super().__setstate__(state)
4352         self._value = state.value
4353         self.defines = state.defines
4354
4355     def __iter__(self):
4356         self._assert_ready()
4357         return iter(self._value)
4358
4359     def __str__(self):
4360         return ".".join(str(arc) for arc in self._value or ())
4361
4362     def __hash__(self):
4363         self._assert_ready()
4364         return hash(b"".join((
4365             self.tag,
4366             bytes(self._expl or b""),
4367             str(self._value).encode("ascii"),
4368         )))
4369
4370     def __eq__(self, their):
4371         if their.__class__ == tuple:
4372             return self._value == array("L", their)
4373         if not issubclass(their.__class__, ObjectIdentifier):
4374             return False
4375         return (
4376             self.tag == their.tag and
4377             self._expl == their._expl and
4378             self._value == their._value
4379         )
4380
4381     def __lt__(self, their):
4382         return self._value < their._value
4383
4384     def __call__(
4385             self,
4386             value=None,
4387             defines=None,
4388             impl=None,
4389             expl=None,
4390             default=None,
4391             optional=None,
4392     ):
4393         return self.__class__(
4394             value=value,
4395             defines=self.defines if defines is None else defines,
4396             impl=self.tag if impl is None else impl,
4397             expl=self._expl if expl is None else expl,
4398             default=self.default if default is None else default,
4399             optional=self.optional if optional is None else optional,
4400         )
4401
4402     def _encode_octets(self):
4403         self._assert_ready()
4404         value = self._value
4405         first_value = value[1]
4406         first_arc = value[0]
4407         if first_arc == 0:
4408             pass
4409         elif first_arc == 1:
4410             first_value += 40
4411         elif first_arc == 2:
4412             first_value += 80
4413         else:  # pragma: no cover
4414             raise RuntimeError("invalid arc is stored")
4415         octets = [zero_ended_encode(first_value)]
4416         for arc in value[2:]:
4417             octets.append(zero_ended_encode(arc))
4418         return b"".join(octets)
4419
4420     def _encode(self):
4421         v = self._encode_octets()
4422         return b"".join((self.tag, len_encode(len(v)), v))
4423
4424     def _encode1st(self, state):
4425         l = len(self._encode_octets())
4426         return len(self.tag) + len_size(l) + l, state
4427
4428     def _encode2nd(self, writer, state_iter):
4429         write_full(writer, self._encode())
4430
4431     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4432         try:
4433             t, _, lv = tag_strip(tlv)
4434         except DecodeError as err:
4435             raise err.__class__(
4436                 msg=err.msg,
4437                 klass=self.__class__,
4438                 decode_path=decode_path,
4439                 offset=offset,
4440             )
4441         if t != self.tag:
4442             raise TagMismatch(
4443                 klass=self.__class__,
4444                 decode_path=decode_path,
4445                 offset=offset,
4446             )
4447         if tag_only:  # pragma: no cover
4448             yield None
4449             return
4450         try:
4451             l, llen, v = len_decode(lv)
4452         except DecodeError as err:
4453             raise err.__class__(
4454                 msg=err.msg,
4455                 klass=self.__class__,
4456                 decode_path=decode_path,
4457                 offset=offset,
4458             )
4459         if l > len(v):
4460             raise NotEnoughData(
4461                 "encoded length is longer than data",
4462                 klass=self.__class__,
4463                 decode_path=decode_path,
4464                 offset=offset,
4465             )
4466         if l == 0:
4467             raise NotEnoughData(
4468                 "zero length",
4469                 klass=self.__class__,
4470                 decode_path=decode_path,
4471                 offset=offset,
4472             )
4473         v, tail = v[:l], v[l:]
4474         arcs = array("L")
4475         ber_encoded = False
4476         while len(v) > 0:
4477             i = 0
4478             arc = 0
4479             while True:
4480                 octet = v[i]
4481                 if i == 0 and octet == 0x80:
4482                     if ctx.get("bered", False):
4483                         ber_encoded = True
4484                     else:
4485                         raise DecodeError(
4486                             "non normalized arc encoding",
4487                             klass=self.__class__,
4488                             decode_path=decode_path,
4489                             offset=offset,
4490                         )
4491                 arc = (arc << 7) | (octet & 0x7F)
4492                 if octet & 0x80 == 0:
4493                     try:
4494                         arcs.append(arc)
4495                     except OverflowError:
4496                         raise DecodeError(
4497                             "too huge value for local unsigned long",
4498                             klass=self.__class__,
4499                             decode_path=decode_path,
4500                             offset=offset,
4501                         )
4502                     v = v[i + 1:]
4503                     break
4504                 i += 1
4505                 if i == len(v):
4506                     raise DecodeError(
4507                         "unfinished OID",
4508                         klass=self.__class__,
4509                         decode_path=decode_path,
4510                         offset=offset,
4511                     )
4512         first_arc = 0
4513         second_arc = arcs[0]
4514         if 0 <= second_arc <= 39:
4515             first_arc = 0
4516         elif 40 <= second_arc <= 79:
4517             first_arc = 1
4518             second_arc -= 40
4519         else:
4520             first_arc = 2
4521             second_arc -= 80
4522         obj = self.__class__(
4523             value=array("L", (first_arc, second_arc)) + arcs[1:],
4524             impl=self.tag,
4525             expl=self._expl,
4526             default=self.default,
4527             optional=self.optional,
4528             _decoded=(offset, llen, l),
4529         )
4530         if ber_encoded:
4531             obj.ber_encoded = True
4532         yield decode_path, obj, tail
4533
4534     def __repr__(self):
4535         return pp_console_row(next(self.pps()))
4536
4537     def pps(self, decode_path=()):
4538         yield _pp(
4539             obj=self,
4540             asn1_type_name=self.asn1_type_name,
4541             obj_name=self.__class__.__name__,
4542             decode_path=decode_path,
4543             value=str(self) if self.ready else None,
4544             optional=self.optional,
4545             default=self == self.default,
4546             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4547             expl=None if self._expl is None else tag_decode(self._expl),
4548             offset=self.offset,
4549             tlen=self.tlen,
4550             llen=self.llen,
4551             vlen=self.vlen,
4552             expl_offset=self.expl_offset if self.expled else None,
4553             expl_tlen=self.expl_tlen if self.expled else None,
4554             expl_llen=self.expl_llen if self.expled else None,
4555             expl_vlen=self.expl_vlen if self.expled else None,
4556             expl_lenindef=self.expl_lenindef,
4557             ber_encoded=self.ber_encoded,
4558             bered=self.bered,
4559         )
4560         for pp in self.pps_lenindef(decode_path):
4561             yield pp
4562
4563
4564 class Enumerated(Integer):
4565     """``ENUMERATED`` integer type
4566
4567     This type is identical to :py:class:`pyderasn.Integer`, but requires
4568     schema to be specified and does not accept values missing from it.
4569     """
4570     __slots__ = ()
4571     tag_default = tag_encode(10)
4572     asn1_type_name = "ENUMERATED"
4573
4574     def __init__(
4575             self,
4576             value=None,
4577             impl=None,
4578             expl=None,
4579             default=None,
4580             optional=False,
4581             _specs=None,
4582             _decoded=(0, 0, 0),
4583             bounds=None,  # dummy argument, workability for Integer.decode
4584     ):
4585         super().__init__(
4586             value, bounds, impl, expl, default, optional, _specs, _decoded,
4587         )
4588         if len(self.specs) == 0:
4589             raise ValueError("schema must be specified")
4590
4591     def _value_sanitize(self, value):
4592         if isinstance(value, self.__class__):
4593             value = value._value
4594         elif isinstance(value, int):
4595             for _value in self.specs.values():
4596                 if _value == value:
4597                     break
4598             else:
4599                 raise DecodeError(
4600                     "unknown integer value: %s" % value,
4601                     klass=self.__class__,
4602                 )
4603         elif isinstance(value, str):
4604             value = self.specs.get(value)
4605             if value is None:
4606                 raise ObjUnknown("integer value: %s" % value)
4607         else:
4608             raise InvalidValueType((self.__class__, int, str))
4609         return value
4610
4611     def __call__(
4612             self,
4613             value=None,
4614             impl=None,
4615             expl=None,
4616             default=None,
4617             optional=None,
4618             _specs=None,
4619     ):
4620         return self.__class__(
4621             value=value,
4622             impl=self.tag if impl is None else impl,
4623             expl=self._expl if expl is None else expl,
4624             default=self.default if default is None else default,
4625             optional=self.optional if optional is None else optional,
4626             _specs=self.specs,
4627         )
4628
4629
4630 def escape_control_unicode(c):
4631     if unicat(c)[0] == "C":
4632         c = repr(c).lstrip("u").strip("'")
4633     return c
4634
4635
4636 class CommonString(OctetString):
4637     """Common class for all strings
4638
4639     Everything resembles :py:class:`pyderasn.OctetString`, except
4640     ability to deal with unicode text strings.
4641
4642     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4643     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4644     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4645     True
4646     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4647     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4648     >>> str(s)
4649     'привет Ð¼Ð¸Ñ€'
4650     >>> hexenc(bytes(s))
4651     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4652
4653     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4654     Traceback (most recent call last):
4655     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4656
4657     >>> BMPString("ада", bounds=(2, 2))
4658     Traceback (most recent call last):
4659     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4660     >>> s = BMPString("ад", bounds=(2, 2))
4661     >>> s.encoding
4662     'utf-16-be'
4663     >>> hexenc(bytes(s))
4664     '04300434'
4665
4666     .. list-table::
4667        :header-rows: 1
4668
4669        * - Class
4670          - Text Encoding, validation
4671        * - :py:class:`pyderasn.UTF8String`
4672          - utf-8
4673        * - :py:class:`pyderasn.NumericString`
4674          - proper alphabet validation
4675        * - :py:class:`pyderasn.PrintableString`
4676          - proper alphabet validation
4677        * - :py:class:`pyderasn.TeletexString`
4678          - iso-8859-1
4679        * - :py:class:`pyderasn.T61String`
4680          - iso-8859-1
4681        * - :py:class:`pyderasn.VideotexString`
4682          - iso-8859-1
4683        * - :py:class:`pyderasn.IA5String`
4684          - proper alphabet validation
4685        * - :py:class:`pyderasn.GraphicString`
4686          - iso-8859-1
4687        * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4688          - proper alphabet validation
4689        * - :py:class:`pyderasn.GeneralString`
4690          - iso-8859-1
4691        * - :py:class:`pyderasn.UniversalString`
4692          - utf-32-be
4693        * - :py:class:`pyderasn.BMPString`
4694          - utf-16-be
4695     """
4696     __slots__ = ()
4697
4698     def _value_sanitize(self, value):
4699         value_raw = None
4700         value_decoded = None
4701         if isinstance(value, self.__class__):
4702             value_raw = value._value
4703         elif value.__class__ == str:
4704             value_decoded = value
4705         elif value.__class__ == bytes:
4706             value_raw = value
4707         else:
4708             raise InvalidValueType((self.__class__, str, bytes))
4709         try:
4710             value_raw = (
4711                 value_decoded.encode(self.encoding)
4712                 if value_raw is None else value_raw
4713             )
4714             value_decoded = (
4715                 value_raw.decode(self.encoding)
4716                 if value_decoded is None else value_decoded
4717             )
4718         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4719             raise DecodeError(str(err))
4720         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4721             raise BoundsError(
4722                 self._bound_min,
4723                 len(value_decoded),
4724                 self._bound_max,
4725             )
4726         return value_raw
4727
4728     def __eq__(self, their):
4729         if their.__class__ == bytes:
4730             return self._value == their
4731         if their.__class__ == str:
4732             return self._value == their.encode(self.encoding)
4733         if not isinstance(their, self.__class__):
4734             return False
4735         return (
4736             self._value == their._value and
4737             self.tag == their.tag and
4738             self._expl == their._expl
4739         )
4740
4741     def __unicode__(self):
4742         if self.ready:
4743             return self._value.decode(self.encoding)
4744         return str(self._value)
4745
4746     def __repr__(self):
4747         return pp_console_row(next(self.pps()))
4748
4749     def pps(self, decode_path=()):
4750         value = None
4751         if self.ready:
4752             value = "".join(escape_control_unicode(c) for c in self.__unicode__())
4753         yield _pp(
4754             obj=self,
4755             asn1_type_name=self.asn1_type_name,
4756             obj_name=self.__class__.__name__,
4757             decode_path=decode_path,
4758             value=value,
4759             optional=self.optional,
4760             default=self == self.default,
4761             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4762             expl=None if self._expl is None else tag_decode(self._expl),
4763             offset=self.offset,
4764             tlen=self.tlen,
4765             llen=self.llen,
4766             vlen=self.vlen,
4767             expl_offset=self.expl_offset if self.expled else None,
4768             expl_tlen=self.expl_tlen if self.expled else None,
4769             expl_llen=self.expl_llen if self.expled else None,
4770             expl_vlen=self.expl_vlen if self.expled else None,
4771             expl_lenindef=self.expl_lenindef,
4772             ber_encoded=self.ber_encoded,
4773             bered=self.bered,
4774         )
4775         for pp in self.pps_lenindef(decode_path):
4776             yield pp
4777
4778
4779 class UTF8String(CommonString):
4780     __slots__ = ()
4781     tag_default = tag_encode(12)
4782     encoding = "utf-8"
4783     asn1_type_name = "UTF8String"
4784
4785
4786 class AllowableCharsMixin:
4787     __slots__ = ()
4788
4789     @property
4790     def allowable_chars(self):
4791         return frozenset(chr(c) for c in self._allowable_chars)
4792
4793     def _value_sanitize(self, value):
4794         value = super()._value_sanitize(value)
4795         if not frozenset(value) <= self._allowable_chars:
4796             raise DecodeError("non satisfying alphabet value")
4797         return value
4798
4799
4800 NUMERIC_ALLOWABLE_CHARS = frozenset(digits.encode("ascii") + b" ")
4801
4802
4803 class NumericString(AllowableCharsMixin, CommonString):
4804     """Numeric string
4805
4806     Its value is properly sanitized: only ASCII digits with spaces can
4807     be stored.
4808
4809     >>> NumericString().allowable_chars
4810     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4811     """
4812     __slots__ = ("_allowable_chars",)
4813     tag_default = tag_encode(18)
4814     encoding = "ascii"
4815     asn1_type_name = "NumericString"
4816
4817     def __init__(self, *args, **kwargs):
4818         self._allowable_chars = NUMERIC_ALLOWABLE_CHARS
4819         super().__init__(*args, **kwargs)
4820
4821
4822 PrintableStringState = namedtuple(
4823     "PrintableStringState",
4824     OctetStringState._fields + ("allowable_chars",),
4825     **NAMEDTUPLE_KWARGS
4826 )
4827
4828
4829 PRINTABLE_ALLOWABLE_CHARS = frozenset(
4830     (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4831 )
4832
4833
4834 class PrintableString(AllowableCharsMixin, CommonString):
4835     """Printable string
4836
4837     Its value is properly sanitized: see X.680 41.4 table 10.
4838
4839     >>> PrintableString().allowable_chars
4840     frozenset([' ', "'", ..., 'z'])
4841     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4842     PrintableString PrintableString foo*bar
4843     >>> obj.allow_asterisk, obj.allow_ampersand
4844     (True, False)
4845     """
4846     __slots__ = ("_allowable_chars",)
4847     tag_default = tag_encode(19)
4848     encoding = "ascii"
4849     asn1_type_name = "PrintableString"
4850     _asterisk = frozenset("*".encode("ascii"))
4851     _ampersand = frozenset("&".encode("ascii"))
4852
4853     def __init__(
4854             self,
4855             value=None,
4856             bounds=None,
4857             impl=None,
4858             expl=None,
4859             default=None,
4860             optional=False,
4861             _decoded=(0, 0, 0),
4862             ctx=None,
4863             allow_asterisk=False,
4864             allow_ampersand=False,
4865     ):
4866         """
4867         :param allow_asterisk: allow asterisk character
4868         :param allow_ampersand: allow ampersand character
4869         """
4870         allowable_chars = PRINTABLE_ALLOWABLE_CHARS
4871         if allow_asterisk:
4872             allowable_chars |= self._asterisk
4873         if allow_ampersand:
4874             allowable_chars |= self._ampersand
4875         self._allowable_chars = allowable_chars
4876         super().__init__(
4877             value, bounds, impl, expl, default, optional, _decoded, ctx,
4878         )
4879
4880     @property
4881     def allow_asterisk(self):
4882         """Is asterisk character allowed?
4883         """
4884         return self._asterisk <= self._allowable_chars
4885
4886     @property
4887     def allow_ampersand(self):
4888         """Is ampersand character allowed?
4889         """
4890         return self._ampersand <= self._allowable_chars
4891
4892     def __getstate__(self):
4893         return PrintableStringState(
4894             *super().__getstate__(),
4895             **{"allowable_chars": self._allowable_chars}
4896         )
4897
4898     def __setstate__(self, state):
4899         super().__setstate__(state)
4900         self._allowable_chars = state.allowable_chars
4901
4902     def __call__(
4903             self,
4904             value=None,
4905             bounds=None,
4906             impl=None,
4907             expl=None,
4908             default=None,
4909             optional=None,
4910     ):
4911         return self.__class__(
4912             value=value,
4913             bounds=(
4914                 (self._bound_min, self._bound_max)
4915                 if bounds is None else bounds
4916             ),
4917             impl=self.tag if impl is None else impl,
4918             expl=self._expl if expl is None else expl,
4919             default=self.default if default is None else default,
4920             optional=self.optional if optional is None else optional,
4921             allow_asterisk=self.allow_asterisk,
4922             allow_ampersand=self.allow_ampersand,
4923         )
4924
4925
4926 class TeletexString(CommonString):
4927     __slots__ = ()
4928     tag_default = tag_encode(20)
4929     encoding = "iso-8859-1"
4930     asn1_type_name = "TeletexString"
4931
4932
4933 class T61String(TeletexString):
4934     __slots__ = ()
4935     asn1_type_name = "T61String"
4936
4937
4938 class VideotexString(CommonString):
4939     __slots__ = ()
4940     tag_default = tag_encode(21)
4941     encoding = "iso-8859-1"
4942     asn1_type_name = "VideotexString"
4943
4944
4945 IA5_ALLOWABLE_CHARS = frozenset(b"".join(
4946     chr(c).encode("ascii") for c in range(128)
4947 ))
4948
4949
4950 class IA5String(AllowableCharsMixin, CommonString):
4951     """IA5 string
4952
4953     Its value is properly sanitized: it is a mix of
4954
4955     * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4956     * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
4957     * DEL character (0x7F)
4958
4959     It is just 7-bit ASCII.
4960
4961     >>> IA5String().allowable_chars
4962     frozenset(["NUL", ... "DEL"])
4963     """
4964     __slots__ = ("_allowable_chars",)
4965     tag_default = tag_encode(22)
4966     encoding = "ascii"
4967     asn1_type_name = "IA5"
4968
4969     def __init__(self, *args, **kwargs):
4970         self._allowable_chars = IA5_ALLOWABLE_CHARS
4971         super().__init__(*args, **kwargs)
4972
4973
4974 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4975 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4976 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4977 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4978 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4979 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4980
4981
4982 VISIBLE_ALLOWABLE_CHARS = frozenset(b"".join(
4983     chr(c).encode("ascii") for c in range(ord(" "), ord("~") + 1)
4984 ))
4985
4986
4987 class VisibleString(AllowableCharsMixin, CommonString):
4988     """Visible string
4989
4990     Its value is properly sanitized. ASCII subset from space to tilde is
4991     allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
4992
4993     >>> VisibleString().allowable_chars
4994     frozenset([" ", ... "~"])
4995     """
4996     __slots__ = ("_allowable_chars",)
4997     tag_default = tag_encode(26)
4998     encoding = "ascii"
4999     asn1_type_name = "VisibleString"
5000
5001     def __init__(self, *args, **kwargs):
5002         self._allowable_chars = VISIBLE_ALLOWABLE_CHARS
5003         super().__init__(*args, **kwargs)
5004
5005
5006 class ISO646String(VisibleString):
5007     __slots__ = ()
5008     asn1_type_name = "ISO646String"
5009
5010
5011 UTCTimeState = namedtuple(
5012     "UTCTimeState",
5013     OctetStringState._fields + ("ber_raw",),
5014     **NAMEDTUPLE_KWARGS
5015 )
5016
5017
5018 def str_to_time_fractions(value):
5019     v = pureint(value)
5020     year, v = (v // 10**10), (v % 10**10)
5021     month, v = (v // 10**8), (v % 10**8)
5022     day, v = (v // 10**6), (v % 10**6)
5023     hour, v = (v // 10**4), (v % 10**4)
5024     minute, second = (v // 100), (v % 100)
5025     return year, month, day, hour, minute, second
5026
5027
5028 class UTCTime(VisibleString):
5029     """``UTCTime`` datetime type
5030
5031     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5032     UTCTime UTCTime 2017-09-30T22:07:50
5033     >>> str(t)
5034     '170930220750Z'
5035     >>> bytes(t)
5036     b'170930220750Z'
5037     >>> t.todatetime()
5038     datetime.datetime(2017, 9, 30, 22, 7, 50)
5039     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5040     datetime.datetime(1957, 9, 30, 22, 7, 50)
5041
5042     If BER encoded value was met, then ``ber_raw`` attribute will hold
5043     its raw representation.
5044
5045     .. warning::
5046
5047        Only **naive** ``datetime`` objects are supported.
5048        Library assumes that all work is done in UTC.
5049
5050     .. warning::
5051
5052        Pay attention that ``UTCTime`` can not hold full year, so all years
5053        having < 50 years are treated as 20xx, 19xx otherwise, according to
5054        X.509 recommendation. Use ``GeneralizedTime`` instead for
5055        removing ambiguity.
5056
5057     .. warning::
5058
5059        No strict validation of UTC offsets are made (only applicable to
5060        **BER**), but very crude:
5061
5062        * minutes are not exceeding 60
5063        * offset value is not exceeding 14 hours
5064     """
5065     __slots__ = ("ber_raw",)
5066     tag_default = tag_encode(23)
5067     encoding = "ascii"
5068     asn1_type_name = "UTCTime"
5069     evgen_mode_skip_value = False
5070
5071     def __init__(
5072             self,
5073             value=None,
5074             impl=None,
5075             expl=None,
5076             default=None,
5077             optional=False,
5078             _decoded=(0, 0, 0),
5079             bounds=None,  # dummy argument, workability for OctetString.decode
5080             ctx=None,
5081     ):
5082         """
5083         :param value: set the value. Either datetime type, or
5084                       :py:class:`pyderasn.UTCTime` object
5085         :param bytes impl: override default tag with ``IMPLICIT`` one
5086         :param bytes expl: override default tag with ``EXPLICIT`` one
5087         :param default: set default value. Type same as in ``value``
5088         :param bool optional: is object ``OPTIONAL`` in sequence
5089         """
5090         super().__init__(None, None, impl, expl, None, optional, _decoded, ctx)
5091         self._value = value
5092         self.ber_raw = None
5093         if value is not None:
5094             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5095             self.ber_encoded = self.ber_raw is not None
5096         if default is not None:
5097             default, _ = self._value_sanitize(default)
5098             self.default = self.__class__(
5099                 value=default,
5100                 impl=self.tag,
5101                 expl=self._expl,
5102             )
5103             if self._value is None:
5104                 self._value = default
5105             optional = True
5106         self.optional = optional
5107
5108     def _strptime_bered(self, value):
5109         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5110         value = value[10:]
5111         if len(value) == 0:
5112             raise ValueError("no timezone")
5113         year += 2000 if year < 50 else 1900
5114         decoded = datetime(year, month, day, hour, minute)
5115         offset = 0
5116         if value[-1] == "Z":
5117             value = value[:-1]
5118         else:
5119             if len(value) < 5:
5120                 raise ValueError("invalid UTC offset")
5121             if value[-5] == "-":
5122                 sign = -1
5123             elif value[-5] == "+":
5124                 sign = 1
5125             else:
5126                 raise ValueError("invalid UTC offset")
5127             v = pureint(value[-4:])
5128             offset, v = (60 * (v % 100)), v // 100
5129             if offset >= 3600:
5130                 raise ValueError("invalid UTC offset minutes")
5131             offset += 3600 * v
5132             if offset > 14 * 3600:
5133                 raise ValueError("too big UTC offset")
5134             offset *= sign
5135             value = value[:-5]
5136         if len(value) == 0:
5137             return offset, decoded
5138         if len(value) != 2:
5139             raise ValueError("invalid UTC offset seconds")
5140         seconds = pureint(value)
5141         if seconds >= 60:
5142             raise ValueError("invalid seconds value")
5143         return offset, decoded + timedelta(seconds=seconds)
5144
5145     def _strptime(self, value):
5146         # datetime.strptime's format: %y%m%d%H%M%SZ
5147         if len(value) != LEN_YYMMDDHHMMSSZ:
5148             raise ValueError("invalid UTCTime length")
5149         if value[-1] != "Z":
5150             raise ValueError("non UTC timezone")
5151         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5152         year += 2000 if year < 50 else 1900
5153         return datetime(year, month, day, hour, minute, second)
5154
5155     def _dt_sanitize(self, value):
5156         if value.year < 1950 or value.year > 2049:
5157             raise ValueError("UTCTime can hold only 1950-2049 years")
5158         return value.replace(microsecond=0)
5159
5160     def _value_sanitize(self, value, ctx=None):
5161         if value.__class__ == bytes:
5162             try:
5163                 value_decoded = value.decode("ascii")
5164             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5165                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5166             err = None
5167             try:
5168                 return self._strptime(value_decoded), None
5169             except (TypeError, ValueError) as _err:
5170                 err = _err
5171                 if (ctx is not None) and ctx.get("bered", False):
5172                     try:
5173                         offset, _value = self._strptime_bered(value_decoded)
5174                         _value = _value - timedelta(seconds=offset)
5175                         return self._dt_sanitize(_value), value
5176                     except (TypeError, ValueError, OverflowError) as _err:
5177                         err = _err
5178             raise DecodeError(
5179                 "invalid %s format: %r" % (self.asn1_type_name, err),
5180                 klass=self.__class__,
5181             )
5182         if isinstance(value, self.__class__):
5183             return value._value, None
5184         if value.__class__ == datetime:
5185             if value.tzinfo is not None:
5186                 raise ValueError("only naive datetime supported")
5187             return self._dt_sanitize(value), None
5188         raise InvalidValueType((self.__class__, datetime))
5189
5190     def _pp_value(self):
5191         if self.ready:
5192             value = self._value.isoformat()
5193             if self.ber_encoded:
5194                 value += " (%s)" % self.ber_raw
5195             return value
5196         return None
5197
5198     def __unicode__(self):
5199         if self.ready:
5200             value = self._value.isoformat()
5201             if self.ber_encoded:
5202                 value += " (%s)" % self.ber_raw
5203             return value
5204         return str(self._pp_value())
5205
5206     def __getstate__(self):
5207         return UTCTimeState(*super().__getstate__(), **{"ber_raw": self.ber_raw})
5208
5209     def __setstate__(self, state):
5210         super().__setstate__(state)
5211         self.ber_raw = state.ber_raw
5212
5213     def __bytes__(self):
5214         self._assert_ready()
5215         return self._encode_time()
5216
5217     def __eq__(self, their):
5218         if their.__class__ == bytes:
5219             return self._encode_time() == their
5220         if their.__class__ == datetime:
5221             return self.todatetime() == their
5222         if not isinstance(their, self.__class__):
5223             return False
5224         return (
5225             self._value == their._value and
5226             self.tag == their.tag and
5227             self._expl == their._expl
5228         )
5229
5230     def _encode_time(self):
5231         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5232
5233     def _encode(self):
5234         self._assert_ready()
5235         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5236
5237     def _encode1st(self, state):
5238         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5239
5240     def _encode2nd(self, writer, state_iter):
5241         self._assert_ready()
5242         write_full(writer, self._encode())
5243
5244     def _encode_cer(self, writer):
5245         write_full(writer, self._encode())
5246
5247     def todatetime(self):
5248         return self._value
5249
5250     def totzdatetime(self):
5251         if tzutc is None:
5252             raise NotImplementedError(
5253                 "Package python-dateutil is required to use this feature",
5254             )
5255         return self._value.replace(tzinfo=tzutc())
5256
5257     def __repr__(self):
5258         return pp_console_row(next(self.pps()))
5259
5260     def pps(self, decode_path=()):
5261         yield _pp(
5262             obj=self,
5263             asn1_type_name=self.asn1_type_name,
5264             obj_name=self.__class__.__name__,
5265             decode_path=decode_path,
5266             value=self._pp_value(),
5267             optional=self.optional,
5268             default=self == self.default,
5269             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5270             expl=None if self._expl is None else tag_decode(self._expl),
5271             offset=self.offset,
5272             tlen=self.tlen,
5273             llen=self.llen,
5274             vlen=self.vlen,
5275             expl_offset=self.expl_offset if self.expled else None,
5276             expl_tlen=self.expl_tlen if self.expled else None,
5277             expl_llen=self.expl_llen if self.expled else None,
5278             expl_vlen=self.expl_vlen if self.expled else None,
5279             expl_lenindef=self.expl_lenindef,
5280             ber_encoded=self.ber_encoded,
5281             bered=self.bered,
5282         )
5283         for pp in self.pps_lenindef(decode_path):
5284             yield pp
5285
5286
5287 class GeneralizedTime(UTCTime):
5288     """``GeneralizedTime`` datetime type
5289
5290     This type is similar to :py:class:`pyderasn.UTCTime`.
5291
5292     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5293     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5294     >>> str(t)
5295     '20170930220750.000123Z'
5296     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5297     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5298
5299     .. warning::
5300
5301        Only **naive** datetime objects are supported.
5302        Library assumes that all work is done in UTC.
5303
5304     .. warning::
5305
5306        Only **microsecond** fractions are supported in DER encoding.
5307        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5308        higher precision values.
5309
5310     .. warning::
5311
5312        **BER** encoded data can loss information (accuracy) during
5313        decoding because of float transformations.
5314
5315     .. warning::
5316
5317        **Zero** year is unsupported.
5318     """
5319     __slots__ = ()
5320     tag_default = tag_encode(24)
5321     asn1_type_name = "GeneralizedTime"
5322
5323     def _dt_sanitize(self, value):
5324         return value
5325
5326     def _strptime_bered(self, value):
5327         if len(value) < 4 + 3 * 2:
5328             raise ValueError("invalid GeneralizedTime")
5329         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5330         decoded = datetime(year, month, day, hour)
5331         offset, value = 0, value[10:]
5332         if len(value) == 0:
5333             return offset, decoded
5334         if value[-1] == "Z":
5335             value = value[:-1]
5336         else:
5337             for char, sign in (("-", -1), ("+", 1)):
5338                 idx = value.rfind(char)
5339                 if idx == -1:
5340                     continue
5341                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5342                 v = pureint(offset_raw)
5343                 if len(offset_raw) == 4:
5344                     offset, v = (60 * (v % 100)), v // 100
5345                     if offset >= 3600:
5346                         raise ValueError("invalid UTC offset minutes")
5347                 elif len(offset_raw) == 2:
5348                     pass
5349                 else:
5350                     raise ValueError("invalid UTC offset")
5351                 offset += 3600 * v
5352                 if offset > 14 * 3600:
5353                     raise ValueError("too big UTC offset")
5354                 offset *= sign
5355                 break
5356         if len(value) == 0:
5357             return offset, decoded
5358         if value[0] in DECIMAL_SIGNS:
5359             return offset, (
5360                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5361             )
5362         if len(value) < 2:
5363             raise ValueError("stripped minutes")
5364         decoded += timedelta(seconds=60 * pureint(value[:2]))
5365         value = value[2:]
5366         if len(value) == 0:
5367             return offset, decoded
5368         if value[0] in DECIMAL_SIGNS:
5369             return offset, (
5370                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5371             )
5372         if len(value) < 2:
5373             raise ValueError("stripped seconds")
5374         decoded += timedelta(seconds=pureint(value[:2]))
5375         value = value[2:]
5376         if len(value) == 0:
5377             return offset, decoded
5378         if value[0] not in DECIMAL_SIGNS:
5379             raise ValueError("invalid format after seconds")
5380         return offset, (
5381             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5382         )
5383
5384     def _strptime(self, value):
5385         l = len(value)
5386         if l == LEN_YYYYMMDDHHMMSSZ:
5387             # datetime.strptime's format: %Y%m%d%H%M%SZ
5388             if value[-1] != "Z":
5389                 raise ValueError("non UTC timezone")
5390             return datetime(*str_to_time_fractions(value[:-1]))
5391         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5392             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5393             if value[-1] != "Z":
5394                 raise ValueError("non UTC timezone")
5395             if value[14] != ".":
5396                 raise ValueError("no fractions separator")
5397             us = value[15:-1]
5398             if us[-1] == "0":
5399                 raise ValueError("trailing zero")
5400             us_len = len(us)
5401             if us_len > 6:
5402                 raise ValueError("only microsecond fractions are supported")
5403             us = pureint(us + ("0" * (6 - us_len)))
5404             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5405             return datetime(year, month, day, hour, minute, second, us)
5406         raise ValueError("invalid GeneralizedTime length")
5407
5408     def _encode_time(self):
5409         value = self._value
5410         encoded = value.strftime("%Y%m%d%H%M%S")
5411         if value.microsecond > 0:
5412             encoded += (".%06d" % value.microsecond).rstrip("0")
5413         return (encoded + "Z").encode("ascii")
5414
5415     def _encode(self):
5416         self._assert_ready()
5417         value = self._value
5418         if value.microsecond > 0:
5419             encoded = self._encode_time()
5420             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5421         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5422
5423     def _encode1st(self, state):
5424         self._assert_ready()
5425         vlen = len(self._encode_time())
5426         return len(self.tag) + len_size(vlen) + vlen, state
5427
5428     def _encode2nd(self, writer, state_iter):
5429         write_full(writer, self._encode())
5430
5431
5432 class GraphicString(CommonString):
5433     __slots__ = ()
5434     tag_default = tag_encode(25)
5435     encoding = "iso-8859-1"
5436     asn1_type_name = "GraphicString"
5437
5438
5439 class GeneralString(CommonString):
5440     __slots__ = ()
5441     tag_default = tag_encode(27)
5442     encoding = "iso-8859-1"
5443     asn1_type_name = "GeneralString"
5444
5445
5446 class UniversalString(CommonString):
5447     __slots__ = ()
5448     tag_default = tag_encode(28)
5449     encoding = "utf-32-be"
5450     asn1_type_name = "UniversalString"
5451
5452
5453 class BMPString(CommonString):
5454     __slots__ = ()
5455     tag_default = tag_encode(30)
5456     encoding = "utf-16-be"
5457     asn1_type_name = "BMPString"
5458
5459
5460 ChoiceState = namedtuple(
5461     "ChoiceState",
5462     BasicState._fields + ("specs", "value",),
5463     **NAMEDTUPLE_KWARGS
5464 )
5465
5466
5467 class Choice(Obj):
5468     """``CHOICE`` special type
5469
5470     ::
5471
5472         class GeneralName(Choice):
5473             schema = (
5474                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5475                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5476             )
5477
5478     >>> gn = GeneralName()
5479     GeneralName CHOICE
5480     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5481     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5482     >>> gn["dNSName"] = IA5String("bar.baz")
5483     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5484     >>> gn["rfc822Name"]
5485     None
5486     >>> gn["dNSName"]
5487     [2] IA5String IA5 bar.baz
5488     >>> gn.choice
5489     'dNSName'
5490     >>> gn.value == gn["dNSName"]
5491     True
5492     >>> gn.specs
5493     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5494
5495     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5496     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5497     """
5498     __slots__ = ("specs",)
5499     tag_default = None
5500     asn1_type_name = "CHOICE"
5501
5502     def __init__(
5503             self,
5504             value=None,
5505             schema=None,
5506             impl=None,
5507             expl=None,
5508             default=None,
5509             optional=False,
5510             _decoded=(0, 0, 0),
5511     ):
5512         """
5513         :param value: set the value. Either ``(choice, value)`` tuple, or
5514                       :py:class:`pyderasn.Choice` object
5515         :param bytes impl: can not be set, do **not** use it
5516         :param bytes expl: override default tag with ``EXPLICIT`` one
5517         :param default: set default value. Type same as in ``value``
5518         :param bool optional: is object ``OPTIONAL`` in sequence
5519         """
5520         if impl is not None:
5521             raise ValueError("no implicit tag allowed for CHOICE")
5522         super().__init__(None, expl, default, optional, _decoded)
5523         if schema is None:
5524             schema = getattr(self, "schema", ())
5525         if len(schema) == 0:
5526             raise ValueError("schema must be specified")
5527         self.specs = (
5528             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5529         )
5530         self._value = None
5531         if value is not None:
5532             self._value = self._value_sanitize(value)
5533         if default is not None:
5534             default_value = self._value_sanitize(default)
5535             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5536             default_obj.specs = self.specs
5537             default_obj._value = default_value
5538             self.default = default_obj
5539             if value is None:
5540                 self._value = copy(default_obj._value)
5541         if self._expl is not None:
5542             tag_class, _, tag_num = tag_decode(self._expl)
5543             self._tag_order = (tag_class, tag_num)
5544
5545     def _value_sanitize(self, value):
5546         if (value.__class__ == tuple) and len(value) == 2:
5547             choice, obj = value
5548             spec = self.specs.get(choice)
5549             if spec is None:
5550                 raise ObjUnknown(choice)
5551             if not isinstance(obj, spec.__class__):
5552                 raise InvalidValueType((spec,))
5553             return (choice, spec(obj))
5554         if isinstance(value, self.__class__):
5555             return value._value
5556         raise InvalidValueType((self.__class__, tuple))
5557
5558     @property
5559     def ready(self):
5560         return self._value is not None and self._value[1].ready
5561
5562     @property
5563     def bered(self):
5564         return self.expl_lenindef or (
5565             (self._value is not None) and
5566             self._value[1].bered
5567         )
5568
5569     def __getstate__(self):
5570         return ChoiceState(
5571             __version__,
5572             self.tag,
5573             self._tag_order,
5574             self._expl,
5575             self.default,
5576             self.optional,
5577             self.offset,
5578             self.llen,
5579             self.vlen,
5580             self.expl_lenindef,
5581             self.lenindef,
5582             self.ber_encoded,
5583             self.specs,
5584             copy(self._value),
5585         )
5586
5587     def __setstate__(self, state):
5588         super().__setstate__(state)
5589         self.specs = state.specs
5590         self._value = state.value
5591
5592     def __eq__(self, their):
5593         if (their.__class__ == tuple) and len(their) == 2:
5594             return self._value == their
5595         if not isinstance(their, self.__class__):
5596             return False
5597         return (
5598             self.specs == their.specs and
5599             self._value == their._value
5600         )
5601
5602     def __call__(
5603             self,
5604             value=None,
5605             expl=None,
5606             default=None,
5607             optional=None,
5608     ):
5609         return self.__class__(
5610             value=value,
5611             schema=self.specs,
5612             expl=self._expl if expl is None else expl,
5613             default=self.default if default is None else default,
5614             optional=self.optional if optional is None else optional,
5615         )
5616
5617     @property
5618     def choice(self):
5619         """Name of the choice
5620         """
5621         self._assert_ready()
5622         return self._value[0]
5623
5624     @property
5625     def value(self):
5626         """Value of underlying choice
5627         """
5628         self._assert_ready()
5629         return self._value[1]
5630
5631     @property
5632     def tag_order(self):
5633         self._assert_ready()
5634         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5635
5636     @property
5637     def tag_order_cer(self):
5638         return min(v.tag_order_cer for v in self.specs.values())
5639
5640     def __getitem__(self, key):
5641         if key not in self.specs:
5642             raise ObjUnknown(key)
5643         if self._value is None:
5644             return None
5645         choice, value = self._value
5646         if choice != key:
5647             return None
5648         return value
5649
5650     def __setitem__(self, key, value):
5651         spec = self.specs.get(key)
5652         if spec is None:
5653             raise ObjUnknown(key)
5654         if not isinstance(value, spec.__class__):
5655             raise InvalidValueType((spec.__class__,))
5656         self._value = (key, spec(value))
5657
5658     @property
5659     def tlen(self):
5660         return 0
5661
5662     @property
5663     def decoded(self):
5664         return self._value[1].decoded if self.ready else False
5665
5666     def _encode(self):
5667         self._assert_ready()
5668         return self._value[1].encode()
5669
5670     def _encode1st(self, state):
5671         self._assert_ready()
5672         return self._value[1].encode1st(state)
5673
5674     def _encode2nd(self, writer, state_iter):
5675         self._value[1].encode2nd(writer, state_iter)
5676
5677     def _encode_cer(self, writer):
5678         self._assert_ready()
5679         self._value[1].encode_cer(writer)
5680
5681     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5682         for choice, spec in self.specs.items():
5683             sub_decode_path = decode_path + (choice,)
5684             try:
5685                 spec.decode(
5686                     tlv,
5687                     offset=offset,
5688                     leavemm=True,
5689                     decode_path=sub_decode_path,
5690                     ctx=ctx,
5691                     tag_only=True,
5692                     _ctx_immutable=False,
5693                 )
5694             except TagMismatch:
5695                 continue
5696             break
5697         else:
5698             raise TagMismatch(
5699                 klass=self.__class__,
5700                 decode_path=decode_path,
5701                 offset=offset,
5702             )
5703         if tag_only:  # pragma: no cover
5704             yield None
5705             return
5706         if evgen_mode:
5707             for _decode_path, value, tail in spec.decode_evgen(
5708                     tlv,
5709                     offset=offset,
5710                     leavemm=True,
5711                     decode_path=sub_decode_path,
5712                     ctx=ctx,
5713                     _ctx_immutable=False,
5714             ):
5715                 yield _decode_path, value, tail
5716         else:
5717             _, value, tail = next(spec.decode_evgen(
5718                 tlv,
5719                 offset=offset,
5720                 leavemm=True,
5721                 decode_path=sub_decode_path,
5722                 ctx=ctx,
5723                 _ctx_immutable=False,
5724                 _evgen_mode=False,
5725             ))
5726         obj = self.__class__(
5727             schema=self.specs,
5728             expl=self._expl,
5729             default=self.default,
5730             optional=self.optional,
5731             _decoded=(offset, 0, value.fulllen),
5732         )
5733         obj._value = (choice, value)
5734         yield decode_path, obj, tail
5735
5736     def __repr__(self):
5737         value = pp_console_row(next(self.pps()))
5738         if self.ready:
5739             value = "%s[%r]" % (value, self.value)
5740         return value
5741
5742     def pps(self, decode_path=()):
5743         yield _pp(
5744             obj=self,
5745             asn1_type_name=self.asn1_type_name,
5746             obj_name=self.__class__.__name__,
5747             decode_path=decode_path,
5748             value=self.choice if self.ready else None,
5749             optional=self.optional,
5750             default=self == self.default,
5751             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5752             expl=None if self._expl is None else tag_decode(self._expl),
5753             offset=self.offset,
5754             tlen=self.tlen,
5755             llen=self.llen,
5756             vlen=self.vlen,
5757             expl_lenindef=self.expl_lenindef,
5758             bered=self.bered,
5759         )
5760         if self.ready:
5761             yield self.value.pps(decode_path=decode_path + (self.choice,))
5762         for pp in self.pps_lenindef(decode_path):
5763             yield pp
5764
5765
5766 class PrimitiveTypes(Choice):
5767     """Predefined ``CHOICE`` for all generic primitive types
5768
5769     It could be useful for general decoding of some unspecified values:
5770
5771     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5772     OCTET STRING 3 bytes 666f6f
5773     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5774     INTEGER 1193046
5775     """
5776     __slots__ = ()
5777     schema = tuple((klass.__name__, klass()) for klass in (
5778         Boolean,
5779         Integer,
5780         BitString,
5781         OctetString,
5782         Null,
5783         ObjectIdentifier,
5784         UTF8String,
5785         NumericString,
5786         PrintableString,
5787         TeletexString,
5788         VideotexString,
5789         IA5String,
5790         UTCTime,
5791         GeneralizedTime,
5792         GraphicString,
5793         VisibleString,
5794         ISO646String,
5795         GeneralString,
5796         UniversalString,
5797         BMPString,
5798     ))
5799
5800
5801 AnyState = namedtuple(
5802     "AnyState",
5803     BasicState._fields + ("value", "defined"),
5804     **NAMEDTUPLE_KWARGS
5805 )
5806
5807
5808 class Any(Obj):
5809     """``ANY`` special type
5810
5811     >>> Any(Integer(-123))
5812     ANY INTEGER -123 (0X:7B)
5813     >>> a = Any(OctetString(b"hello world").encode())
5814     ANY 040b68656c6c6f20776f726c64
5815     >>> hexenc(bytes(a))
5816     b'0x040x0bhello world'
5817     """
5818     __slots__ = ("defined",)
5819     tag_default = tag_encode(0)
5820     asn1_type_name = "ANY"
5821
5822     def __init__(
5823             self,
5824             value=None,
5825             expl=None,
5826             optional=False,
5827             _decoded=(0, 0, 0),
5828     ):
5829         """
5830         :param value: set the value. Either any kind of pyderasn's
5831                       **ready** object, or bytes. Pay attention that
5832                       **no** validation is performed if raw binary value
5833                       is valid TLV, except just tag decoding
5834         :param bytes expl: override default tag with ``EXPLICIT`` one
5835         :param bool optional: is object ``OPTIONAL`` in sequence
5836         """
5837         super().__init__(None, expl, None, optional, _decoded)
5838         if value is None:
5839             self._value = None
5840         else:
5841             value = self._value_sanitize(value)
5842             self._value = value
5843             if self._expl is None:
5844                 if value.__class__ == bytes:
5845                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5846                 else:
5847                     tag_class, tag_num = value.tag_order
5848             else:
5849                 tag_class, _, tag_num = tag_decode(self._expl)
5850             self._tag_order = (tag_class, tag_num)
5851         self.defined = None
5852
5853     def _value_sanitize(self, value):
5854         if value.__class__ == bytes:
5855             if len(value) == 0:
5856                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5857             return value
5858         if isinstance(value, self.__class__):
5859             return value._value
5860         if not isinstance(value, Obj):
5861             raise InvalidValueType((self.__class__, Obj, bytes))
5862         return value
5863
5864     @property
5865     def ready(self):
5866         return self._value is not None
5867
5868     @property
5869     def tag_order(self):
5870         self._assert_ready()
5871         return self._tag_order
5872
5873     @property
5874     def bered(self):
5875         if self.expl_lenindef or self.lenindef:
5876             return True
5877         if self.defined is None:
5878             return False
5879         return self.defined[1].bered
5880
5881     def __getstate__(self):
5882         return AnyState(
5883             __version__,
5884             self.tag,
5885             self._tag_order,
5886             self._expl,
5887             None,
5888             self.optional,
5889             self.offset,
5890             self.llen,
5891             self.vlen,
5892             self.expl_lenindef,
5893             self.lenindef,
5894             self.ber_encoded,
5895             self._value,
5896             self.defined,
5897         )
5898
5899     def __setstate__(self, state):
5900         super().__setstate__(state)
5901         self._value = state.value
5902         self.defined = state.defined
5903
5904     def __eq__(self, their):
5905         if their.__class__ == bytes:
5906             if self._value.__class__ == bytes:
5907                 return self._value == their
5908             return self._value.encode() == their
5909         if issubclass(their.__class__, Any):
5910             if self.ready and their.ready:
5911                 return bytes(self) == bytes(their)
5912             return self.ready == their.ready
5913         return False
5914
5915     def __call__(
5916             self,
5917             value=None,
5918             expl=None,
5919             optional=None,
5920     ):
5921         return self.__class__(
5922             value=value,
5923             expl=self._expl if expl is None else expl,
5924             optional=self.optional if optional is None else optional,
5925         )
5926
5927     def __bytes__(self):
5928         self._assert_ready()
5929         value = self._value
5930         if value.__class__ == bytes:
5931             return value
5932         return self._value.encode()
5933
5934     @property
5935     def tlen(self):
5936         return 0
5937
5938     def _encode(self):
5939         self._assert_ready()
5940         value = self._value
5941         if value.__class__ == bytes:
5942             return value
5943         return value.encode()
5944
5945     def _encode1st(self, state):
5946         self._assert_ready()
5947         value = self._value
5948         if value.__class__ == bytes:
5949             return len(value), state
5950         return value.encode1st(state)
5951
5952     def _encode2nd(self, writer, state_iter):
5953         value = self._value
5954         if value.__class__ == bytes:
5955             write_full(writer, value)
5956         else:
5957             value.encode2nd(writer, state_iter)
5958
5959     def _encode_cer(self, writer):
5960         self._assert_ready()
5961         value = self._value
5962         if value.__class__ == bytes:
5963             write_full(writer, value)
5964         else:
5965             value.encode_cer(writer)
5966
5967     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5968         try:
5969             t, tlen, lv = tag_strip(tlv)
5970         except DecodeError as err:
5971             raise err.__class__(
5972                 msg=err.msg,
5973                 klass=self.__class__,
5974                 decode_path=decode_path,
5975                 offset=offset,
5976             )
5977         try:
5978             l, llen, v = len_decode(lv)
5979         except LenIndefForm as err:
5980             if not ctx.get("bered", False):
5981                 raise err.__class__(
5982                     msg=err.msg,
5983                     klass=self.__class__,
5984                     decode_path=decode_path,
5985                     offset=offset,
5986                 )
5987             llen, vlen, v = 1, 0, lv[1:]
5988             sub_offset = offset + tlen + llen
5989             chunk_i = 0
5990             while v[:EOC_LEN].tobytes() != EOC:
5991                 chunk, v = Any().decode(
5992                     v,
5993                     offset=sub_offset,
5994                     decode_path=decode_path + (str(chunk_i),),
5995                     leavemm=True,
5996                     ctx=ctx,
5997                     _ctx_immutable=False,
5998                 )
5999                 vlen += chunk.tlvlen
6000                 sub_offset += chunk.tlvlen
6001                 chunk_i += 1
6002             tlvlen = tlen + llen + vlen + EOC_LEN
6003             obj = self.__class__(
6004                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6005                 expl=self._expl,
6006                 optional=self.optional,
6007                 _decoded=(offset, 0, tlvlen),
6008             )
6009             obj.lenindef = True
6010             obj.tag = t.tobytes()
6011             yield decode_path, obj, v[EOC_LEN:]
6012             return
6013         except DecodeError as err:
6014             raise err.__class__(
6015                 msg=err.msg,
6016                 klass=self.__class__,
6017                 decode_path=decode_path,
6018                 offset=offset,
6019             )
6020         if l > len(v):
6021             raise NotEnoughData(
6022                 "encoded length is longer than data",
6023                 klass=self.__class__,
6024                 decode_path=decode_path,
6025                 offset=offset,
6026             )
6027         tlvlen = tlen + llen + l
6028         v, tail = tlv[:tlvlen], v[l:]
6029         obj = self.__class__(
6030             value=None if evgen_mode else v.tobytes(),
6031             expl=self._expl,
6032             optional=self.optional,
6033             _decoded=(offset, 0, tlvlen),
6034         )
6035         obj.tag = t.tobytes()
6036         yield decode_path, obj, tail
6037
6038     def __repr__(self):
6039         return pp_console_row(next(self.pps()))
6040
6041     def pps(self, decode_path=()):
6042         value = self._value
6043         if value is None:
6044             pass
6045         elif value.__class__ == bytes:
6046             value = None
6047         else:
6048             value = repr(value)
6049         yield _pp(
6050             obj=self,
6051             asn1_type_name=self.asn1_type_name,
6052             obj_name=self.__class__.__name__,
6053             decode_path=decode_path,
6054             value=value,
6055             blob=self._value if self._value.__class__ == bytes else None,
6056             optional=self.optional,
6057             default=self == self.default,
6058             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6059             expl=None if self._expl is None else tag_decode(self._expl),
6060             offset=self.offset,
6061             tlen=self.tlen,
6062             llen=self.llen,
6063             vlen=self.vlen,
6064             expl_offset=self.expl_offset if self.expled else None,
6065             expl_tlen=self.expl_tlen if self.expled else None,
6066             expl_llen=self.expl_llen if self.expled else None,
6067             expl_vlen=self.expl_vlen if self.expled else None,
6068             expl_lenindef=self.expl_lenindef,
6069             lenindef=self.lenindef,
6070             bered=self.bered,
6071         )
6072         defined_by, defined = self.defined or (None, None)
6073         if defined_by is not None:
6074             yield defined.pps(
6075                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6076             )
6077         for pp in self.pps_lenindef(decode_path):
6078             yield pp
6079
6080
6081 ########################################################################
6082 # ASN.1 constructed types
6083 ########################################################################
6084
6085 def abs_decode_path(decode_path, rel_path):
6086     """Create an absolute decode path from current and relative ones
6087
6088     :param decode_path: current decode path, starting point. Tuple of strings
6089     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6090                      If first tuple's element is "/", then treat it as
6091                      an absolute path, ignoring ``decode_path`` as
6092                      starting point. Also this tuple can contain ".."
6093                      elements, stripping the leading element from
6094                      ``decode_path``
6095
6096     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6097     ("foo", "bar", "baz", "whatever")
6098     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6099     ("foo", "whatever")
6100     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6101     ("baz", "whatever")
6102     """
6103     if rel_path[0] == "/":
6104         return rel_path[1:]
6105     if rel_path[0] == "..":
6106         return abs_decode_path(decode_path[:-1], rel_path[1:])
6107     return decode_path + rel_path
6108
6109
6110 SequenceState = namedtuple(
6111     "SequenceState",
6112     BasicState._fields + ("specs", "value",),
6113     **NAMEDTUPLE_KWARGS
6114 )
6115
6116
6117 class SequenceEncode1stMixin:
6118     __slots__ = ()
6119
6120     def _encode1st(self, state):
6121         state.append(0)
6122         idx = len(state) - 1
6123         vlen = 0
6124         for v in self._values_for_encoding():
6125             l, _ = v.encode1st(state)
6126             vlen += l
6127         state[idx] = vlen
6128         return len(self.tag) + len_size(vlen) + vlen, state
6129
6130
6131 class Sequence(SequenceEncode1stMixin, Obj):
6132     """``SEQUENCE`` structure type
6133
6134     You have to make specification of sequence::
6135
6136         class Extension(Sequence):
6137             schema = (
6138                 ("extnID", ObjectIdentifier()),
6139                 ("critical", Boolean(default=False)),
6140                 ("extnValue", OctetString()),
6141             )
6142
6143     Then, you can work with it as with dictionary.
6144
6145     >>> ext = Extension()
6146     >>> Extension().specs
6147     OrderedDict([
6148         ('extnID', OBJECT IDENTIFIER),
6149         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6150         ('extnValue', OCTET STRING),
6151     ])
6152     >>> ext["extnID"] = "1.2.3"
6153     Traceback (most recent call last):
6154     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6155     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6156
6157     You can determine if sequence is ready to be encoded:
6158
6159     >>> ext.ready
6160     False
6161     >>> ext.encode()
6162     Traceback (most recent call last):
6163     pyderasn.ObjNotReady: object is not ready: extnValue
6164     >>> ext["extnValue"] = OctetString(b"foobar")
6165     >>> ext.ready
6166     True
6167
6168     Value you want to assign, must have the same **type** as in
6169     corresponding specification, but it can have different tags,
6170     optional/default attributes -- they will be taken from specification
6171     automatically::
6172
6173         class TBSCertificate(Sequence):
6174             schema = (
6175                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6176             [...]
6177
6178     >>> tbs = TBSCertificate()
6179     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6180
6181     Assign ``None`` to remove value from sequence.
6182
6183     You can set values in Sequence during its initialization:
6184
6185     >>> AlgorithmIdentifier((
6186         ("algorithm", ObjectIdentifier("1.2.3")),
6187         ("parameters", Any(Null()))
6188     ))
6189     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6190
6191     You can determine if value exists/set in the sequence and take its value:
6192
6193     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6194     (True, True, False)
6195     >>> ext["extnID"]
6196     OBJECT IDENTIFIER 1.2.3
6197
6198     But pay attention that if value has default, then it won't be (not
6199     in) in the sequence (because ``DEFAULT`` must not be encoded in
6200     DER), but you can read its value:
6201
6202     >>> "critical" in ext, ext["critical"]
6203     (False, BOOLEAN False)
6204     >>> ext["critical"] = Boolean(True)
6205     >>> "critical" in ext, ext["critical"]
6206     (True, BOOLEAN True)
6207
6208     All defaulted values are always optional.
6209
6210     .. _allow_default_values_ctx:
6211
6212     DER prohibits default value encoding and will raise an error if
6213     default value is unexpectedly met during decode.
6214     If :ref:`bered <bered_ctx>` context option is set, then no error
6215     will be raised, but ``bered`` attribute set. You can disable strict
6216     defaulted values existence validation by setting
6217     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6218
6219     All values with DEFAULT specified are decoded atomically in
6220     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6221     SEQUENCE, then it will be yielded as a single element, not
6222     disassembled. That is required for DEFAULT existence check.
6223
6224     Two sequences are equal if they have equal specification (schema),
6225     implicit/explicit tagging and the same values.
6226     """
6227     __slots__ = ("specs",)
6228     tag_default = tag_encode(form=TagFormConstructed, num=16)
6229     asn1_type_name = "SEQUENCE"
6230
6231     def __init__(
6232             self,
6233             value=None,
6234             schema=None,
6235             impl=None,
6236             expl=None,
6237             default=None,
6238             optional=False,
6239             _decoded=(0, 0, 0),
6240     ):
6241         super().__init__(impl, expl, default, optional, _decoded)
6242         if schema is None:
6243             schema = getattr(self, "schema", ())
6244         self.specs = (
6245             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6246         )
6247         self._value = {}
6248         if value is not None:
6249             if issubclass(value.__class__, Sequence):
6250                 self._value = value._value
6251             elif hasattr(value, "__iter__"):
6252                 for seq_key, seq_value in value:
6253                     self[seq_key] = seq_value
6254             else:
6255                 raise InvalidValueType((Sequence,))
6256         if default is not None:
6257             if not issubclass(default.__class__, Sequence):
6258                 raise InvalidValueType((Sequence,))
6259             default_value = default._value
6260             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6261             default_obj.specs = self.specs
6262             default_obj._value = default_value
6263             self.default = default_obj
6264             if value is None:
6265                 self._value = copy(default_obj._value)
6266
6267     @property
6268     def ready(self):
6269         for name, spec in self.specs.items():
6270             value = self._value.get(name)
6271             if value is None:
6272                 if spec.optional:
6273                     continue
6274                 return False
6275             if not value.ready:
6276                 return False
6277         return True
6278
6279     @property
6280     def bered(self):
6281         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6282             return True
6283         return any(value.bered for value in self._value.values())
6284
6285     def __getstate__(self):
6286         return SequenceState(
6287             __version__,
6288             self.tag,
6289             self._tag_order,
6290             self._expl,
6291             self.default,
6292             self.optional,
6293             self.offset,
6294             self.llen,
6295             self.vlen,
6296             self.expl_lenindef,
6297             self.lenindef,
6298             self.ber_encoded,
6299             self.specs,
6300             {k: copy(v) for k, v in self._value.items()},
6301         )
6302
6303     def __setstate__(self, state):
6304         super().__setstate__(state)
6305         self.specs = state.specs
6306         self._value = state.value
6307
6308     def __eq__(self, their):
6309         if not isinstance(their, self.__class__):
6310             return False
6311         return (
6312             self.specs == their.specs and
6313             self.tag == their.tag and
6314             self._expl == their._expl and
6315             self._value == their._value
6316         )
6317
6318     def __call__(
6319             self,
6320             value=None,
6321             impl=None,
6322             expl=None,
6323             default=None,
6324             optional=None,
6325     ):
6326         return self.__class__(
6327             value=value,
6328             schema=self.specs,
6329             impl=self.tag if impl is None else impl,
6330             expl=self._expl if expl is None else expl,
6331             default=self.default if default is None else default,
6332             optional=self.optional if optional is None else optional,
6333         )
6334
6335     def __contains__(self, key):
6336         return key in self._value
6337
6338     def __setitem__(self, key, value):
6339         spec = self.specs.get(key)
6340         if spec is None:
6341             raise ObjUnknown(key)
6342         if value is None:
6343             self._value.pop(key, None)
6344             return
6345         if not isinstance(value, spec.__class__):
6346             raise InvalidValueType((spec.__class__,))
6347         value = spec(value=value)
6348         if spec.default is not None and value == spec.default:
6349             self._value.pop(key, None)
6350             return
6351         self._value[key] = value
6352
6353     def __getitem__(self, key):
6354         value = self._value.get(key)
6355         if value is not None:
6356             return value
6357         spec = self.specs.get(key)
6358         if spec is None:
6359             raise ObjUnknown(key)
6360         if spec.default is not None:
6361             return spec.default
6362         return None
6363
6364     def _values_for_encoding(self):
6365         for name, spec in self.specs.items():
6366             value = self._value.get(name)
6367             if value is None:
6368                 if spec.optional:
6369                     continue
6370                 raise ObjNotReady(name)
6371             yield value
6372
6373     def _encode(self):
6374         v = b"".join(v.encode() for v in self._values_for_encoding())
6375         return b"".join((self.tag, len_encode(len(v)), v))
6376
6377     def _encode2nd(self, writer, state_iter):
6378         write_full(writer, self.tag + len_encode(next(state_iter)))
6379         for v in self._values_for_encoding():
6380             v.encode2nd(writer, state_iter)
6381
6382     def _encode_cer(self, writer):
6383         write_full(writer, self.tag + LENINDEF)
6384         for v in self._values_for_encoding():
6385             v.encode_cer(writer)
6386         write_full(writer, EOC)
6387
6388     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6389         try:
6390             t, tlen, lv = tag_strip(tlv)
6391         except DecodeError as err:
6392             raise err.__class__(
6393                 msg=err.msg,
6394                 klass=self.__class__,
6395                 decode_path=decode_path,
6396                 offset=offset,
6397             )
6398         if t != self.tag:
6399             raise TagMismatch(
6400                 klass=self.__class__,
6401                 decode_path=decode_path,
6402                 offset=offset,
6403             )
6404         if tag_only:  # pragma: no cover
6405             yield None
6406             return
6407         lenindef = False
6408         ctx_bered = ctx.get("bered", False)
6409         try:
6410             l, llen, v = len_decode(lv)
6411         except LenIndefForm as err:
6412             if not ctx_bered:
6413                 raise err.__class__(
6414                     msg=err.msg,
6415                     klass=self.__class__,
6416                     decode_path=decode_path,
6417                     offset=offset,
6418                 )
6419             l, llen, v = 0, 1, lv[1:]
6420             lenindef = True
6421         except DecodeError as err:
6422             raise err.__class__(
6423                 msg=err.msg,
6424                 klass=self.__class__,
6425                 decode_path=decode_path,
6426                 offset=offset,
6427             )
6428         if l > len(v):
6429             raise NotEnoughData(
6430                 "encoded length is longer than data",
6431                 klass=self.__class__,
6432                 decode_path=decode_path,
6433                 offset=offset,
6434             )
6435         if not lenindef:
6436             v, tail = v[:l], v[l:]
6437         vlen = 0
6438         sub_offset = offset + tlen + llen
6439         values = {}
6440         ber_encoded = False
6441         ctx_allow_default_values = ctx.get("allow_default_values", False)
6442         for name, spec in self.specs.items():
6443             if spec.optional and (
6444                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6445                     len(v) == 0
6446             ):
6447                 continue
6448             spec_defaulted = spec.default is not None
6449             sub_decode_path = decode_path + (name,)
6450             try:
6451                 if evgen_mode and not spec_defaulted:
6452                     for _decode_path, value, v_tail in spec.decode_evgen(
6453                             v,
6454                             sub_offset,
6455                             leavemm=True,
6456                             decode_path=sub_decode_path,
6457                             ctx=ctx,
6458                             _ctx_immutable=False,
6459                     ):
6460                         yield _decode_path, value, v_tail
6461                 else:
6462                     _, value, v_tail = next(spec.decode_evgen(
6463                         v,
6464                         sub_offset,
6465                         leavemm=True,
6466                         decode_path=sub_decode_path,
6467                         ctx=ctx,
6468                         _ctx_immutable=False,
6469                         _evgen_mode=False,
6470                     ))
6471             except TagMismatch as err:
6472                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6473                     continue
6474                 raise
6475
6476             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6477             if not evgen_mode and defined is not None:
6478                 defined_by, defined_spec = defined
6479                 if issubclass(value.__class__, SequenceOf):
6480                     for i, _value in enumerate(value):
6481                         sub_sub_decode_path = sub_decode_path + (
6482                             str(i),
6483                             DecodePathDefBy(defined_by),
6484                         )
6485                         defined_value, defined_tail = defined_spec.decode(
6486                             memoryview(bytes(_value)),
6487                             sub_offset + (
6488                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6489                                 if value.expled else (value.tlen + value.llen)
6490                             ),
6491                             leavemm=True,
6492                             decode_path=sub_sub_decode_path,
6493                             ctx=ctx,
6494                             _ctx_immutable=False,
6495                         )
6496                         if len(defined_tail) > 0:
6497                             raise DecodeError(
6498                                 "remaining data",
6499                                 klass=self.__class__,
6500                                 decode_path=sub_sub_decode_path,
6501                                 offset=offset,
6502                             )
6503                         _value.defined = (defined_by, defined_value)
6504                 else:
6505                     defined_value, defined_tail = defined_spec.decode(
6506                         memoryview(bytes(value)),
6507                         sub_offset + (
6508                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6509                             if value.expled else (value.tlen + value.llen)
6510                         ),
6511                         leavemm=True,
6512                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6513                         ctx=ctx,
6514                         _ctx_immutable=False,
6515                     )
6516                     if len(defined_tail) > 0:
6517                         raise DecodeError(
6518                             "remaining data",
6519                             klass=self.__class__,
6520                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6521                             offset=offset,
6522                         )
6523                     value.defined = (defined_by, defined_value)
6524
6525             value_len = value.fulllen
6526             vlen += value_len
6527             sub_offset += value_len
6528             v = v_tail
6529             if spec_defaulted:
6530                 if evgen_mode:
6531                     yield sub_decode_path, value, v_tail
6532                 if value == spec.default:
6533                     if ctx_bered or ctx_allow_default_values:
6534                         ber_encoded = True
6535                     else:
6536                         raise DecodeError(
6537                             "DEFAULT value met",
6538                             klass=self.__class__,
6539                             decode_path=sub_decode_path,
6540                             offset=sub_offset,
6541                         )
6542             if not evgen_mode:
6543                 values[name] = value
6544                 spec_defines = getattr(spec, "defines", ())
6545                 if len(spec_defines) == 0:
6546                     defines_by_path = ctx.get("defines_by_path", ())
6547                     if len(defines_by_path) > 0:
6548                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6549                 if spec_defines is not None and len(spec_defines) > 0:
6550                     for rel_path, schema in spec_defines:
6551                         defined = schema.get(value, None)
6552                         if defined is not None:
6553                             ctx.setdefault("_defines", []).append((
6554                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6555                                 (value, defined),
6556                             ))
6557         if lenindef:
6558             if v[:EOC_LEN].tobytes() != EOC:
6559                 raise DecodeError(
6560                     "no EOC",
6561                     klass=self.__class__,
6562                     decode_path=decode_path,
6563                     offset=offset,
6564                 )
6565             tail = v[EOC_LEN:]
6566             vlen += EOC_LEN
6567         elif len(v) > 0:
6568             raise DecodeError(
6569                 "remaining data",
6570                 klass=self.__class__,
6571                 decode_path=decode_path,
6572                 offset=offset,
6573             )
6574         obj = self.__class__(
6575             schema=self.specs,
6576             impl=self.tag,
6577             expl=self._expl,
6578             default=self.default,
6579             optional=self.optional,
6580             _decoded=(offset, llen, vlen),
6581         )
6582         obj._value = values
6583         obj.lenindef = lenindef
6584         obj.ber_encoded = ber_encoded
6585         yield decode_path, obj, tail
6586
6587     def __repr__(self):
6588         value = pp_console_row(next(self.pps()))
6589         cols = []
6590         for name in self.specs:
6591             _value = self._value.get(name)
6592             if _value is None:
6593                 continue
6594             cols.append("%s: %s" % (name, repr(_value)))
6595         return "%s[%s]" % (value, "; ".join(cols))
6596
6597     def pps(self, decode_path=()):
6598         yield _pp(
6599             obj=self,
6600             asn1_type_name=self.asn1_type_name,
6601             obj_name=self.__class__.__name__,
6602             decode_path=decode_path,
6603             optional=self.optional,
6604             default=self == self.default,
6605             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6606             expl=None if self._expl is None else tag_decode(self._expl),
6607             offset=self.offset,
6608             tlen=self.tlen,
6609             llen=self.llen,
6610             vlen=self.vlen,
6611             expl_offset=self.expl_offset if self.expled else None,
6612             expl_tlen=self.expl_tlen if self.expled else None,
6613             expl_llen=self.expl_llen if self.expled else None,
6614             expl_vlen=self.expl_vlen if self.expled else None,
6615             expl_lenindef=self.expl_lenindef,
6616             lenindef=self.lenindef,
6617             ber_encoded=self.ber_encoded,
6618             bered=self.bered,
6619         )
6620         for name in self.specs:
6621             value = self._value.get(name)
6622             if value is None:
6623                 continue
6624             yield value.pps(decode_path=decode_path + (name,))
6625         for pp in self.pps_lenindef(decode_path):
6626             yield pp
6627
6628
6629 class Set(Sequence, SequenceEncode1stMixin):
6630     """``SET`` structure type
6631
6632     Its usage is identical to :py:class:`pyderasn.Sequence`.
6633
6634     .. _allow_unordered_set_ctx:
6635
6636     DER prohibits unordered values encoding and will raise an error
6637     during decode. If :ref:`bered <bered_ctx>` context option is set,
6638     then no error will occur. Also you can disable strict values
6639     ordering check by setting ``"allow_unordered_set": True``
6640     :ref:`context <ctx>` option.
6641     """
6642     __slots__ = ()
6643     tag_default = tag_encode(form=TagFormConstructed, num=17)
6644     asn1_type_name = "SET"
6645
6646     def _values_for_encoding(self):
6647         return sorted(super()._values_for_encoding(), key=attrgetter("tag_order"))
6648
6649     def _encode_cer(self, writer):
6650         write_full(writer, self.tag + LENINDEF)
6651         for v in sorted(
6652                 super()._values_for_encoding(),
6653                 key=attrgetter("tag_order_cer"),
6654         ):
6655             v.encode_cer(writer)
6656         write_full(writer, EOC)
6657
6658     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6659         try:
6660             t, tlen, lv = tag_strip(tlv)
6661         except DecodeError as err:
6662             raise err.__class__(
6663                 msg=err.msg,
6664                 klass=self.__class__,
6665                 decode_path=decode_path,
6666                 offset=offset,
6667             )
6668         if t != self.tag:
6669             raise TagMismatch(
6670                 klass=self.__class__,
6671                 decode_path=decode_path,
6672                 offset=offset,
6673             )
6674         if tag_only:
6675             yield None
6676             return
6677         lenindef = False
6678         ctx_bered = ctx.get("bered", False)
6679         try:
6680             l, llen, v = len_decode(lv)
6681         except LenIndefForm as err:
6682             if not ctx_bered:
6683                 raise err.__class__(
6684                     msg=err.msg,
6685                     klass=self.__class__,
6686                     decode_path=decode_path,
6687                     offset=offset,
6688                 )
6689             l, llen, v = 0, 1, lv[1:]
6690             lenindef = True
6691         except DecodeError as err:
6692             raise err.__class__(
6693                 msg=err.msg,
6694                 klass=self.__class__,
6695                 decode_path=decode_path,
6696                 offset=offset,
6697             )
6698         if l > len(v):
6699             raise NotEnoughData(
6700                 "encoded length is longer than data",
6701                 klass=self.__class__,
6702                 offset=offset,
6703             )
6704         if not lenindef:
6705             v, tail = v[:l], v[l:]
6706         vlen = 0
6707         sub_offset = offset + tlen + llen
6708         values = {}
6709         ber_encoded = False
6710         ctx_allow_default_values = ctx.get("allow_default_values", False)
6711         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6712         tag_order_prev = (0, 0)
6713         _specs_items = copy(self.specs)
6714
6715         while len(v) > 0:
6716             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6717                 break
6718             for name, spec in _specs_items.items():
6719                 sub_decode_path = decode_path + (name,)
6720                 try:
6721                     spec.decode(
6722                         v,
6723                         sub_offset,
6724                         leavemm=True,
6725                         decode_path=sub_decode_path,
6726                         ctx=ctx,
6727                         tag_only=True,
6728                         _ctx_immutable=False,
6729                     )
6730                 except TagMismatch:
6731                     continue
6732                 break
6733             else:
6734                 raise TagMismatch(
6735                     klass=self.__class__,
6736                     decode_path=decode_path,
6737                     offset=offset,
6738                 )
6739             spec_defaulted = spec.default is not None
6740             if evgen_mode and not spec_defaulted:
6741                 for _decode_path, value, v_tail in spec.decode_evgen(
6742                         v,
6743                         sub_offset,
6744                         leavemm=True,
6745                         decode_path=sub_decode_path,
6746                         ctx=ctx,
6747                         _ctx_immutable=False,
6748                 ):
6749                     yield _decode_path, value, v_tail
6750             else:
6751                 _, value, v_tail = next(spec.decode_evgen(
6752                     v,
6753                     sub_offset,
6754                     leavemm=True,
6755                     decode_path=sub_decode_path,
6756                     ctx=ctx,
6757                     _ctx_immutable=False,
6758                     _evgen_mode=False,
6759                 ))
6760             value_tag_order = value.tag_order
6761             value_len = value.fulllen
6762             if tag_order_prev >= value_tag_order:
6763                 if ctx_bered or ctx_allow_unordered_set:
6764                     ber_encoded = True
6765                 else:
6766                     raise DecodeError(
6767                         "unordered " + self.asn1_type_name,
6768                         klass=self.__class__,
6769                         decode_path=sub_decode_path,
6770                         offset=sub_offset,
6771                     )
6772             if spec_defaulted:
6773                 if evgen_mode:
6774                     yield sub_decode_path, value, v_tail
6775                 if value != spec.default:
6776                     pass
6777                 elif ctx_bered or ctx_allow_default_values:
6778                     ber_encoded = True
6779                 else:
6780                     raise DecodeError(
6781                         "DEFAULT value met",
6782                         klass=self.__class__,
6783                         decode_path=sub_decode_path,
6784                         offset=sub_offset,
6785                     )
6786             values[name] = value
6787             del _specs_items[name]
6788             tag_order_prev = value_tag_order
6789             sub_offset += value_len
6790             vlen += value_len
6791             v = v_tail
6792
6793         obj = self.__class__(
6794             schema=self.specs,
6795             impl=self.tag,
6796             expl=self._expl,
6797             default=self.default,
6798             optional=self.optional,
6799             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6800         )
6801         if lenindef:
6802             if v[:EOC_LEN].tobytes() != EOC:
6803                 raise DecodeError(
6804                     "no EOC",
6805                     klass=self.__class__,
6806                     decode_path=decode_path,
6807                     offset=offset,
6808                 )
6809             tail = v[EOC_LEN:]
6810             obj.lenindef = True
6811         for name, spec in self.specs.items():
6812             if name not in values and not spec.optional:
6813                 raise DecodeError(
6814                     "%s value is not ready" % name,
6815                     klass=self.__class__,
6816                     decode_path=decode_path,
6817                     offset=offset,
6818                 )
6819         if not evgen_mode:
6820             obj._value = values
6821         obj.ber_encoded = ber_encoded
6822         yield decode_path, obj, tail
6823
6824
6825 SequenceOfState = namedtuple(
6826     "SequenceOfState",
6827     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6828     **NAMEDTUPLE_KWARGS
6829 )
6830
6831
6832 class SequenceOf(SequenceEncode1stMixin, Obj):
6833     """``SEQUENCE OF`` sequence type
6834
6835     For that kind of type you must specify the object it will carry on
6836     (bounds are for example here, not required)::
6837
6838         class Ints(SequenceOf):
6839             schema = Integer()
6840             bounds = (0, 2)
6841
6842     >>> ints = Ints()
6843     >>> ints.append(Integer(123))
6844     >>> ints.append(Integer(234))
6845     >>> ints
6846     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6847     >>> [int(i) for i in ints]
6848     [123, 234]
6849     >>> ints.append(Integer(345))
6850     Traceback (most recent call last):
6851     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6852     >>> ints[1]
6853     INTEGER 234
6854     >>> ints[1] = Integer(345)
6855     >>> ints
6856     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6857
6858     You can initialize sequence with preinitialized values:
6859
6860     >>> ints = Ints([Integer(123), Integer(234)])
6861
6862     Also you can use iterator as a value:
6863
6864     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6865
6866     And it won't be iterated until encoding process. Pay attention that
6867     bounds and required schema checks are done only during the encoding
6868     process in that case! After encode was called, then value is zeroed
6869     back to empty list and you have to set it again. That mode is useful
6870     mainly with CER encoding mode, where all objects from the iterable
6871     will be streamed to the buffer, without copying all of them to
6872     memory first.
6873     """
6874     __slots__ = ("spec", "_bound_min", "_bound_max")
6875     tag_default = tag_encode(form=TagFormConstructed, num=16)
6876     asn1_type_name = "SEQUENCE OF"
6877
6878     def __init__(
6879             self,
6880             value=None,
6881             schema=None,
6882             bounds=None,
6883             impl=None,
6884             expl=None,
6885             default=None,
6886             optional=False,
6887             _decoded=(0, 0, 0),
6888     ):
6889         super().__init__(impl, expl, default, optional, _decoded)
6890         if schema is None:
6891             schema = getattr(self, "schema", None)
6892         if schema is None:
6893             raise ValueError("schema must be specified")
6894         self.spec = schema
6895         self._bound_min, self._bound_max = getattr(
6896             self,
6897             "bounds",
6898             (0, float("+inf")),
6899         ) if bounds is None else bounds
6900         self._value = []
6901         if value is not None:
6902             self._value = self._value_sanitize(value)
6903         if default is not None:
6904             default_value = self._value_sanitize(default)
6905             default_obj = self.__class__(
6906                 schema=schema,
6907                 impl=self.tag,
6908                 expl=self._expl,
6909             )
6910             default_obj._value = default_value
6911             self.default = default_obj
6912             if value is None:
6913                 self._value = copy(default_obj._value)
6914
6915     def _value_sanitize(self, value):
6916         iterator = False
6917         if issubclass(value.__class__, SequenceOf):
6918             value = value._value
6919         elif hasattr(value, NEXT_ATTR_NAME):
6920             iterator = True
6921         elif hasattr(value, "__iter__"):
6922             value = list(value)
6923         else:
6924             raise InvalidValueType((self.__class__, iter, "iterator"))
6925         if not iterator:
6926             if not self._bound_min <= len(value) <= self._bound_max:
6927                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6928             class_expected = self.spec.__class__
6929             for v in value:
6930                 if not isinstance(v, class_expected):
6931                     raise InvalidValueType((class_expected,))
6932         return value
6933
6934     @property
6935     def ready(self):
6936         if hasattr(self._value, NEXT_ATTR_NAME):
6937             return True
6938         if self._bound_min > 0 and len(self._value) == 0:
6939             return False
6940         return all(v.ready for v in self._value)
6941
6942     @property
6943     def bered(self):
6944         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6945             return True
6946         return any(v.bered for v in self._value)
6947
6948     def __getstate__(self):
6949         if hasattr(self._value, NEXT_ATTR_NAME):
6950             raise ValueError("can not pickle SequenceOf with iterator")
6951         return SequenceOfState(
6952             __version__,
6953             self.tag,
6954             self._tag_order,
6955             self._expl,
6956             self.default,
6957             self.optional,
6958             self.offset,
6959             self.llen,
6960             self.vlen,
6961             self.expl_lenindef,
6962             self.lenindef,
6963             self.ber_encoded,
6964             self.spec,
6965             [copy(v) for v in self._value],
6966             self._bound_min,
6967             self._bound_max,
6968         )
6969
6970     def __setstate__(self, state):
6971         super().__setstate__(state)
6972         self.spec = state.spec
6973         self._value = state.value
6974         self._bound_min = state.bound_min
6975         self._bound_max = state.bound_max
6976
6977     def __eq__(self, their):
6978         if isinstance(their, self.__class__):
6979             return (
6980                 self.spec == their.spec and
6981                 self.tag == their.tag and
6982                 self._expl == their._expl and
6983                 self._value == their._value
6984             )
6985         if hasattr(their, "__iter__"):
6986             return self._value == list(their)
6987         return False
6988
6989     def __call__(
6990             self,
6991             value=None,
6992             bounds=None,
6993             impl=None,
6994             expl=None,
6995             default=None,
6996             optional=None,
6997     ):
6998         return self.__class__(
6999             value=value,
7000             schema=self.spec,
7001             bounds=(
7002                 (self._bound_min, self._bound_max)
7003                 if bounds is None else bounds
7004             ),
7005             impl=self.tag if impl is None else impl,
7006             expl=self._expl if expl is None else expl,
7007             default=self.default if default is None else default,
7008             optional=self.optional if optional is None else optional,
7009         )
7010
7011     def __contains__(self, key):
7012         return key in self._value
7013
7014     def append(self, value):
7015         if not isinstance(value, self.spec.__class__):
7016             raise InvalidValueType((self.spec.__class__,))
7017         if len(self._value) + 1 > self._bound_max:
7018             raise BoundsError(
7019                 self._bound_min,
7020                 len(self._value) + 1,
7021                 self._bound_max,
7022             )
7023         self._value.append(value)
7024
7025     def __iter__(self):
7026         return iter(self._value)
7027
7028     def __len__(self):
7029         return len(self._value)
7030
7031     def __setitem__(self, key, value):
7032         if not isinstance(value, self.spec.__class__):
7033             raise InvalidValueType((self.spec.__class__,))
7034         self._value[key] = self.spec(value=value)
7035
7036     def __getitem__(self, key):
7037         return self._value[key]
7038
7039     def _values_for_encoding(self):
7040         return iter(self._value)
7041
7042     def _encode(self):
7043         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7044         if iterator:
7045             values = []
7046             values_append = values.append
7047             class_expected = self.spec.__class__
7048             values_for_encoding = self._values_for_encoding()
7049             self._value = []
7050             for v in values_for_encoding:
7051                 if not isinstance(v, class_expected):
7052                     raise InvalidValueType((class_expected,))
7053                 values_append(v.encode())
7054             if not self._bound_min <= len(values) <= self._bound_max:
7055                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7056             value = b"".join(values)
7057         else:
7058             value = b"".join(v.encode() for v in self._values_for_encoding())
7059         return b"".join((self.tag, len_encode(len(value)), value))
7060
7061     def _encode1st(self, state):
7062         state = super()._encode1st(state)
7063         if hasattr(self._value, NEXT_ATTR_NAME):
7064             self._value = []
7065         return state
7066
7067     def _encode2nd(self, writer, state_iter):
7068         write_full(writer, self.tag + len_encode(next(state_iter)))
7069         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7070         if iterator:
7071             values_count = 0
7072             class_expected = self.spec.__class__
7073             values_for_encoding = self._values_for_encoding()
7074             self._value = []
7075             for v in values_for_encoding:
7076                 if not isinstance(v, class_expected):
7077                     raise InvalidValueType((class_expected,))
7078                 v.encode2nd(writer, state_iter)
7079                 values_count += 1
7080             if not self._bound_min <= values_count <= self._bound_max:
7081                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7082         else:
7083             for v in self._values_for_encoding():
7084                 v.encode2nd(writer, state_iter)
7085
7086     def _encode_cer(self, writer):
7087         write_full(writer, self.tag + LENINDEF)
7088         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7089         if iterator:
7090             class_expected = self.spec.__class__
7091             values_count = 0
7092             values_for_encoding = self._values_for_encoding()
7093             self._value = []
7094             for v in values_for_encoding:
7095                 if not isinstance(v, class_expected):
7096                     raise InvalidValueType((class_expected,))
7097                 v.encode_cer(writer)
7098                 values_count += 1
7099             if not self._bound_min <= values_count <= self._bound_max:
7100                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7101         else:
7102             for v in self._values_for_encoding():
7103                 v.encode_cer(writer)
7104         write_full(writer, EOC)
7105
7106     def _decode(
7107             self,
7108             tlv,
7109             offset,
7110             decode_path,
7111             ctx,
7112             tag_only,
7113             evgen_mode,
7114             ordering_check=False,
7115     ):
7116         try:
7117             t, tlen, lv = tag_strip(tlv)
7118         except DecodeError as err:
7119             raise err.__class__(
7120                 msg=err.msg,
7121                 klass=self.__class__,
7122                 decode_path=decode_path,
7123                 offset=offset,
7124             )
7125         if t != self.tag:
7126             raise TagMismatch(
7127                 klass=self.__class__,
7128                 decode_path=decode_path,
7129                 offset=offset,
7130             )
7131         if tag_only:
7132             yield None
7133             return
7134         lenindef = False
7135         ctx_bered = ctx.get("bered", False)
7136         try:
7137             l, llen, v = len_decode(lv)
7138         except LenIndefForm as err:
7139             if not ctx_bered:
7140                 raise err.__class__(
7141                     msg=err.msg,
7142                     klass=self.__class__,
7143                     decode_path=decode_path,
7144                     offset=offset,
7145                 )
7146             l, llen, v = 0, 1, lv[1:]
7147             lenindef = True
7148         except DecodeError as err:
7149             raise err.__class__(
7150                 msg=err.msg,
7151                 klass=self.__class__,
7152                 decode_path=decode_path,
7153                 offset=offset,
7154             )
7155         if l > len(v):
7156             raise NotEnoughData(
7157                 "encoded length is longer than data",
7158                 klass=self.__class__,
7159                 decode_path=decode_path,
7160                 offset=offset,
7161             )
7162         if not lenindef:
7163             v, tail = v[:l], v[l:]
7164         vlen = 0
7165         sub_offset = offset + tlen + llen
7166         _value = []
7167         _value_count = 0
7168         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7169         value_prev = memoryview(v[:0])
7170         ber_encoded = False
7171         spec = self.spec
7172         while len(v) > 0:
7173             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7174                 break
7175             sub_decode_path = decode_path + (str(_value_count),)
7176             if evgen_mode:
7177                 for _decode_path, value, v_tail in spec.decode_evgen(
7178                         v,
7179                         sub_offset,
7180                         leavemm=True,
7181                         decode_path=sub_decode_path,
7182                         ctx=ctx,
7183                         _ctx_immutable=False,
7184                 ):
7185                     yield _decode_path, value, v_tail
7186             else:
7187                 _, value, v_tail = next(spec.decode_evgen(
7188                     v,
7189                     sub_offset,
7190                     leavemm=True,
7191                     decode_path=sub_decode_path,
7192                     ctx=ctx,
7193                     _ctx_immutable=False,
7194                     _evgen_mode=False,
7195                 ))
7196             value_len = value.fulllen
7197             if ordering_check:
7198                 if value_prev.tobytes() > v[:value_len].tobytes():
7199                     if ctx_bered or ctx_allow_unordered_set:
7200                         ber_encoded = True
7201                     else:
7202                         raise DecodeError(
7203                             "unordered " + self.asn1_type_name,
7204                             klass=self.__class__,
7205                             decode_path=sub_decode_path,
7206                             offset=sub_offset,
7207                         )
7208                 value_prev = v[:value_len]
7209             _value_count += 1
7210             if not evgen_mode:
7211                 _value.append(value)
7212             sub_offset += value_len
7213             vlen += value_len
7214             v = v_tail
7215         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7216             raise DecodeError(
7217                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7218                 klass=self.__class__,
7219                 decode_path=decode_path,
7220                 offset=offset,
7221             )
7222         try:
7223             obj = self.__class__(
7224                 value=None if evgen_mode else _value,
7225                 schema=spec,
7226                 bounds=(self._bound_min, self._bound_max),
7227                 impl=self.tag,
7228                 expl=self._expl,
7229                 default=self.default,
7230                 optional=self.optional,
7231                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7232             )
7233         except BoundsError as err:
7234             raise DecodeError(
7235                 msg=str(err),
7236                 klass=self.__class__,
7237                 decode_path=decode_path,
7238                 offset=offset,
7239             )
7240         if lenindef:
7241             if v[:EOC_LEN].tobytes() != EOC:
7242                 raise DecodeError(
7243                     "no EOC",
7244                     klass=self.__class__,
7245                     decode_path=decode_path,
7246                     offset=offset,
7247                 )
7248             obj.lenindef = True
7249             tail = v[EOC_LEN:]
7250         obj.ber_encoded = ber_encoded
7251         yield decode_path, obj, tail
7252
7253     def __repr__(self):
7254         return "%s[%s]" % (
7255             pp_console_row(next(self.pps())),
7256             ", ".join(repr(v) for v in self._value),
7257         )
7258
7259     def pps(self, decode_path=()):
7260         yield _pp(
7261             obj=self,
7262             asn1_type_name=self.asn1_type_name,
7263             obj_name=self.__class__.__name__,
7264             decode_path=decode_path,
7265             optional=self.optional,
7266             default=self == self.default,
7267             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7268             expl=None if self._expl is None else tag_decode(self._expl),
7269             offset=self.offset,
7270             tlen=self.tlen,
7271             llen=self.llen,
7272             vlen=self.vlen,
7273             expl_offset=self.expl_offset if self.expled else None,
7274             expl_tlen=self.expl_tlen if self.expled else None,
7275             expl_llen=self.expl_llen if self.expled else None,
7276             expl_vlen=self.expl_vlen if self.expled else None,
7277             expl_lenindef=self.expl_lenindef,
7278             lenindef=self.lenindef,
7279             ber_encoded=self.ber_encoded,
7280             bered=self.bered,
7281         )
7282         for i, value in enumerate(self._value):
7283             yield value.pps(decode_path=decode_path + (str(i),))
7284         for pp in self.pps_lenindef(decode_path):
7285             yield pp
7286
7287
7288 class SetOf(SequenceOf):
7289     """``SET OF`` sequence type
7290
7291     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7292     """
7293     __slots__ = ()
7294     tag_default = tag_encode(form=TagFormConstructed, num=17)
7295     asn1_type_name = "SET OF"
7296
7297     def _value_sanitize(self, value):
7298         value = super()._value_sanitize(value)
7299         if hasattr(value, NEXT_ATTR_NAME):
7300             raise ValueError(
7301                 "SetOf does not support iterator values, as no sense in them"
7302             )
7303         return value
7304
7305     def _encode(self):
7306         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7307         return b"".join((self.tag, len_encode(len(v)), v))
7308
7309     def _encode2nd(self, writer, state_iter):
7310         write_full(writer, self.tag + len_encode(next(state_iter)))
7311         values = []
7312         for v in self._values_for_encoding():
7313             buf = BytesIO()
7314             v.encode2nd(buf.write, state_iter)
7315             values.append(buf.getvalue())
7316         values.sort()
7317         for v in values:
7318             write_full(writer, v)
7319
7320     def _encode_cer(self, writer):
7321         write_full(writer, self.tag + LENINDEF)
7322         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7323             write_full(writer, v)
7324         write_full(writer, EOC)
7325
7326     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7327         return super()._decode(
7328             tlv,
7329             offset,
7330             decode_path,
7331             ctx,
7332             tag_only,
7333             evgen_mode,
7334             ordering_check=True,
7335         )
7336
7337
7338 def obj_by_path(pypath):  # pragma: no cover
7339     """Import object specified as string Python path
7340
7341     Modules must be separated from classes/functions with ``:``.
7342
7343     >>> obj_by_path("foo.bar:Baz")
7344     <class 'foo.bar.Baz'>
7345     >>> obj_by_path("foo.bar:Baz.boo")
7346     <classmethod 'foo.bar.Baz.boo'>
7347     """
7348     mod, objs = pypath.rsplit(":", 1)
7349     from importlib import import_module
7350     obj = import_module(mod)
7351     for obj_name in objs.split("."):
7352         obj = getattr(obj, obj_name)
7353     return obj
7354
7355
7356 def generic_decoder():  # pragma: no cover
7357     # All of this below is a big hack with self references
7358     choice = PrimitiveTypes()
7359     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7360     choice.specs["SetOf"] = SetOf(schema=choice)
7361     for i in range(31):
7362         choice.specs["SequenceOf%d" % i] = SequenceOf(
7363             schema=choice,
7364             expl=tag_ctxc(i),
7365         )
7366     choice.specs["Any"] = Any()
7367
7368     # Class name equals to type name, to omit it from output
7369     class SEQUENCEOF(SequenceOf):
7370         __slots__ = ()
7371         schema = choice
7372
7373     def pprint_any(
7374             obj,
7375             oid_maps=(),
7376             with_colours=False,
7377             with_decode_path=False,
7378             decode_path_only=(),
7379             decode_path=(),
7380     ):
7381         def _pprint_pps(pps):
7382             for pp in pps:
7383                 if hasattr(pp, "_fields"):
7384                     if (
7385                             decode_path_only != () and
7386                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7387                     ):
7388                         continue
7389                     if pp.asn1_type_name == Choice.asn1_type_name:
7390                         continue
7391                     pp_kwargs = pp._asdict()
7392                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7393                     pp = _pp(**pp_kwargs)
7394                     yield pp_console_row(
7395                         pp,
7396                         oid_maps=oid_maps,
7397                         with_offsets=True,
7398                         with_blob=False,
7399                         with_colours=with_colours,
7400                         with_decode_path=with_decode_path,
7401                         decode_path_len_decrease=len(decode_path_only),
7402                     )
7403                     for row in pp_console_blob(
7404                             pp,
7405                             decode_path_len_decrease=len(decode_path_only),
7406                     ):
7407                         yield row
7408                 else:
7409                     for row in _pprint_pps(pp):
7410                         yield row
7411         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7412     return SEQUENCEOF(), pprint_any
7413
7414
7415 def ascii_visualize(ba):
7416     """Output only ASCII printable characters, like in hexdump -C
7417
7418     Example output for given binary string (right part)::
7419
7420         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7421                                                            ^^^^^^^^^^^^^^^^
7422     """
7423     return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7424
7425
7426 def hexdump(raw):
7427     """Generate ``hexdump -C`` like output
7428
7429     Rendered example::
7430
7431         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7432         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7433
7434     Result of that function is a generator of lines, where each line is
7435     a list of columns::
7436
7437         [
7438             [...],
7439             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7440                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7441                           " |i..?.....G...k.j|"]
7442             [...],
7443         ]
7444     """
7445     hexed = hexenc(raw).upper()
7446     addr, cols = 0, ["%08x " % 0]
7447     for i in range(0, len(hexed), 2):
7448         if i != 0 and i // 2 % 8 == 0:
7449             cols[-1] += " "
7450         if i != 0 and i // 2 % 16 == 0:
7451             cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:addr + 16])))
7452             yield cols
7453             addr += 16
7454             cols = ["%08x " % addr]
7455         cols.append(" " + hexed[i:i + 2])
7456     if len(cols) > 0:
7457         cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:])))
7458         yield cols
7459
7460
7461 def browse(raw, obj, oid_maps=()):
7462     """Interactive browser
7463
7464     :param bytes raw: binary data you decoded
7465     :param obj: decoded :py:class:`pyderasn.Obj`
7466     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7467                      Its human readable form is printed when OID is met
7468
7469     .. note:: `urwid <http://urwid.org/>`__ dependency required
7470
7471     This browser is an interactive terminal application for browsing
7472     structures of your decoded ASN.1 objects. You can quit it with **q**
7473     key. It consists of three windows:
7474
7475     :tree:
7476      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7477      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7478      **Left** key goes to constructed element above. **Plus**/**Minus**
7479      keys collapse/uncollapse constructed elements. **Space** toggles it
7480     :info:
7481      window with various information about element. You can scroll it
7482      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7483     :hexdump:
7484      window with raw data hexdump and highlighted current element's
7485      contents. It automatically focuses on element's data. You can
7486      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7487      speed) keys. If element has explicit tag, then it also will be
7488      highlighted with different colour
7489
7490     Window's header contains current decode path and progress bars with
7491     position in *info* and *hexdump* windows.
7492
7493     If you press **d**, then current element will be saved in the
7494     current directory under its decode path name (adding ".0", ".1", etc
7495     suffix if such file already exists). **D** will save it with explicit tag.
7496
7497     You can also invoke it with ``--browse`` command line argument.
7498     """
7499     from copy import deepcopy
7500     from os.path import exists as path_exists
7501     import urwid
7502
7503     class TW(urwid.TreeWidget):
7504         def __init__(self, state, *args, **kwargs):
7505             self.state = state
7506             self.scrolled = {"info": False, "hexdump": False}
7507             super().__init__(*args, **kwargs)
7508
7509         def _get_pp(self):
7510             pp = self.get_node().get_value()
7511             constructed = len(pp) > 1
7512             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7513
7514         def _state_update(self):
7515             pp, _ = self._get_pp()
7516             self.state["decode_path"].set_text(
7517                 ":".join(str(p) for p in pp.decode_path)
7518             )
7519             lines = deepcopy(self.state["hexed"])
7520
7521             def attr_set(i, attr):
7522                 line = lines[i // 16]
7523                 idx = 1 + (i - 16 * (i // 16))
7524                 line[idx] = (attr, line[idx])
7525
7526             if pp.expl_offset is not None:
7527                 for i in range(
7528                         pp.expl_offset,
7529                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7530                 ):
7531                     attr_set(i, "select-expl")
7532             for i in range(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7533                 attr_set(i, "select-value")
7534             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7535             self.state["hexdump"].set_focus(pp.offset // 16)
7536             self.state["hexdump"].set_focus_valign("middle")
7537             self.state["hexdump_bar"].set_completion(
7538                 (100 * pp.offset // 16) //
7539                 len(self.state["hexdump"]._body.positions())
7540             )
7541
7542             lines = [
7543                 [("header", "Name: "), pp.obj_name],
7544                 [("header", "Type: "), pp.asn1_type_name],
7545                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7546                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7547                     pp.tlen, pp.llen, pp.vlen,
7548                 )],
7549                 [("header", "TLVlen: "), "%d" % sum((
7550                     pp.tlen, pp.llen, pp.vlen,
7551                 ))],
7552                 [("header", "Slice: "), "[%d:%d]" % (
7553                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7554                 )],
7555             ]
7556             if pp.lenindef:
7557                 lines.append([("warning", "LENINDEF")])
7558             if pp.ber_encoded:
7559                 lines.append([("warning", "BER encoded")])
7560             if pp.bered:
7561                 lines.append([("warning", "BERed")])
7562             if pp.expl is not None:
7563                 lines.append([("header", "EXPLICIT")])
7564                 klass, _, num = pp.expl
7565                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7566                 if pp.expl_offset is not None:
7567                     lines.append(["  Offset: %d" % pp.expl_offset])
7568                     lines.append(["  [TLV]len: %d/%d/%d" % (
7569                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7570                     )])
7571                     lines.append(["  TLVlen: %d" % sum((
7572                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7573                     ))])
7574                     lines.append(["  Slice: [%d:%d]" % (
7575                         pp.expl_offset,
7576                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7577                     )])
7578             if pp.impl is not None:
7579                 klass, _, num = pp.impl
7580                 lines.append([
7581                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7582                 ])
7583             if pp.optional:
7584                 lines.append(["OPTIONAL"])
7585             if pp.default:
7586                 lines.append(["DEFAULT"])
7587             if len(pp.decode_path) > 0:
7588                 ent = pp.decode_path[-1]
7589                 if isinstance(ent, DecodePathDefBy):
7590                     lines.append([""])
7591                     value = str(ent.defined_by)
7592                     oid_name = find_oid_name(
7593                         ent.defined_by.asn1_type_name, oid_maps, value,
7594                     )
7595                     lines.append([("header", "DEFINED BY: "), "%s" % (
7596                         value if oid_name is None
7597                         else "%s (%s)" % (oid_name, value)
7598                     )])
7599             lines.append([""])
7600             if pp.value is not None:
7601                 lines.append([("header", "Value: "), pp.value])
7602                 if (
7603                         len(oid_maps) > 0 and
7604                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7605                 ):
7606                     for oid_map in oid_maps:
7607                         oid_name = oid_map.get(pp.value)
7608                         if oid_name is not None:
7609                             lines.append([("header", "Human: "), oid_name])
7610                             break
7611                 if pp.asn1_type_name == Integer.asn1_type_name:
7612                     lines.append([
7613                         ("header", "Decimal: "), "%d" % int(pp.obj),
7614                     ])
7615                     lines.append([
7616                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7617                     ])
7618             if pp.blob.__class__ == bytes:
7619                 blob = hexenc(pp.blob).upper()
7620                 for i in range(0, len(blob), 32):
7621                     lines.append([colonize_hex(blob[i:i + 32])])
7622             elif pp.blob.__class__ == tuple:
7623                 lines.append([", ".join(pp.blob)])
7624             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7625             self.state["info_bar"].set_completion(0)
7626
7627         def selectable(self):
7628             if self.state["widget_current"] != self:
7629                 self.state["widget_current"] = self
7630                 self.scrolled["info"] = False
7631                 self.scrolled["hexdump"] = False
7632                 self._state_update()
7633             return super().selectable()
7634
7635         def _get_display_text_without_offset(self):
7636             pp, constructed = self._get_pp()
7637             style = "constructed" if constructed else ""
7638             if len(pp.decode_path) == 0:
7639                 return (style, pp.obj_name)
7640             if pp.asn1_type_name == "EOC":
7641                 return ("eoc", "EOC")
7642             ent = pp.decode_path[-1]
7643             if isinstance(ent, DecodePathDefBy):
7644                 value = str(ent.defined_by)
7645                 oid_name = find_oid_name(
7646                     ent.defined_by.asn1_type_name, oid_maps, value,
7647                 )
7648                 return ("defby", "DEFBY:" + (
7649                     value if oid_name is None else oid_name
7650                 ))
7651             return (style, ent)
7652
7653         def get_display_text(self):
7654             pp, _ = self._get_pp()
7655             style, ent = self._get_display_text_without_offset()
7656             return [(style, ent), " [%d]" % pp.offset]
7657
7658         def _scroll(self, what, step):
7659             self.state[what]._invalidate()
7660             pos = self.state[what].focus_position
7661             if not self.scrolled[what]:
7662                 self.scrolled[what] = True
7663                 pos -= 2
7664             pos = max(0, pos + step)
7665             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7666             self.state[what].set_focus(pos)
7667             self.state[what].set_focus_valign("top")
7668             self.state[what + "_bar"].set_completion(
7669                 (100 * pos) // len(self.state[what]._body.positions())
7670             )
7671
7672         def keypress(self, size, key):
7673             if key == "q":
7674                 raise urwid.ExitMainLoop()
7675
7676             if key == " ":
7677                 self.expanded = not self.expanded
7678                 self.update_expanded_icon()
7679                 return None
7680
7681             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7682             if key in hexdump_steps:
7683                 self._scroll("hexdump", hexdump_steps[key])
7684                 return None
7685
7686             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7687             if key in info_steps:
7688                 self._scroll("info", info_steps[key])
7689                 return None
7690
7691             if key in ("d", "D"):
7692                 pp, _ = self._get_pp()
7693                 dp = ":".join(str(p) for p in pp.decode_path)
7694                 dp = dp.replace(" ", "_")
7695                 if dp == "":
7696                     dp = "root"
7697                 if key == "d" or pp.expl_offset is None:
7698                     data = self.state["raw"][pp.offset:(
7699                         pp.offset + pp.tlen + pp.llen + pp.vlen
7700                     )]
7701                 else:
7702                     data = self.state["raw"][pp.expl_offset:(
7703                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7704                     )]
7705                 ctr = 0
7706
7707                 def duplicate_path(dp, ctr):
7708                     if ctr == 0:
7709                         return dp
7710                     return "%s.%d" % (dp, ctr)
7711
7712                 while True:
7713                     if not path_exists(duplicate_path(dp, ctr)):
7714                         break
7715                     ctr += 1
7716                 dp = duplicate_path(dp, ctr)
7717                 with open(dp, "wb") as fd:
7718                     fd.write(data)
7719                 self.state["decode_path"].set_text(
7720                     ("warning", "Saved to: " + dp)
7721                 )
7722                 return None
7723             return super().keypress(size, key)
7724
7725     class PN(urwid.ParentNode):
7726         def __init__(self, state, value, *args, **kwargs):
7727             self.state = state
7728             if not hasattr(value, "_fields"):
7729                 value = list(value)
7730             super().__init__(value, *args, **kwargs)
7731
7732         def load_widget(self):
7733             return TW(self.state, self)
7734
7735         def load_child_keys(self):
7736             value = self.get_value()
7737             if hasattr(value, "_fields"):
7738                 return []
7739             return range(len(value[1:]))
7740
7741         def load_child_node(self, key):
7742             return PN(
7743                 self.state,
7744                 self.get_value()[key + 1],
7745                 parent=self,
7746                 key=key,
7747                 depth=self.get_depth() + 1,
7748             )
7749
7750     class LabeledPG(urwid.ProgressBar):
7751         def __init__(self, label, *args, **kwargs):
7752             self.label = label
7753             super().__init__(*args, **kwargs)
7754
7755         def get_text(self):
7756             return "%s: %s" % (self.label, super().get_text())
7757
7758     WinHexdump = urwid.ListBox([urwid.Text("")])
7759     WinInfo = urwid.ListBox([urwid.Text("")])
7760     WinDecodePath = urwid.Text("", "center")
7761     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7762     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7763     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7764         {
7765             "raw": raw,
7766             "hexed": list(hexdump(raw)),
7767             "widget_current": None,
7768             "info": WinInfo,
7769             "info_bar": WinInfoBar,
7770             "hexdump": WinHexdump,
7771             "hexdump_bar": WinHexdumpBar,
7772             "decode_path": WinDecodePath,
7773         },
7774         list(obj.pps()),
7775     )))
7776     help_text = " ".join((
7777         "q:quit",
7778         "space:(un)collapse",
7779         "(pg)up/down/home/end:nav",
7780         "jkJK:hexdump hlHL:info",
7781         "dD:dump",
7782     ))
7783     urwid.MainLoop(
7784         urwid.Frame(
7785             urwid.Columns([
7786                 ("weight", 1, WinTree),
7787                 ("weight", 2, urwid.Pile([
7788                     urwid.LineBox(WinInfo),
7789                     urwid.LineBox(WinHexdump),
7790                 ])),
7791             ]),
7792             header=urwid.Columns([
7793                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7794                 ("weight", 1, WinInfoBar),
7795                 ("weight", 1, WinHexdumpBar),
7796             ]),
7797             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7798         ),
7799         [
7800             ("header", "bold", ""),
7801             ("constructed", "bold", ""),
7802             ("help", "light magenta", ""),
7803             ("warning", "light red", ""),
7804             ("defby", "light red", ""),
7805             ("eoc", "dark red", ""),
7806             ("select-value", "light green", ""),
7807             ("select-expl", "light red", ""),
7808             ("pg-normal", "", "light blue"),
7809             ("pg-complete", "black", "yellow"),
7810         ],
7811     ).run()
7812
7813
7814 def main():  # pragma: no cover
7815     import argparse
7816     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7817     parser.add_argument(
7818         "--skip",
7819         type=int,
7820         default=0,
7821         help="Skip that number of bytes from the beginning",
7822     )
7823     parser.add_argument(
7824         "--oids",
7825         help="Python paths to dictionary with OIDs, comma separated",
7826     )
7827     parser.add_argument(
7828         "--schema",
7829         help="Python path to schema definition to use",
7830     )
7831     parser.add_argument(
7832         "--defines-by-path",
7833         help="Python path to decoder's defines_by_path",
7834     )
7835     parser.add_argument(
7836         "--nobered",
7837         action="store_true",
7838         help="Disallow BER encoding",
7839     )
7840     parser.add_argument(
7841         "--print-decode-path",
7842         action="store_true",
7843         help="Print decode paths",
7844     )
7845     parser.add_argument(
7846         "--decode-path-only",
7847         help="Print only specified decode path",
7848     )
7849     parser.add_argument(
7850         "--allow-expl-oob",
7851         action="store_true",
7852         help="Allow explicit tag out-of-bound",
7853     )
7854     parser.add_argument(
7855         "--evgen",
7856         action="store_true",
7857         help="Turn on event generation mode",
7858     )
7859     parser.add_argument(
7860         "--browse",
7861         action="store_true",
7862         help="Start ASN.1 browser",
7863     )
7864     parser.add_argument(
7865         "RAWFile",
7866         type=argparse.FileType("rb"),
7867         help="Path to BER/CER/DER file you want to decode",
7868     )
7869     args = parser.parse_args()
7870     try:
7871         raw = file_mmaped(args.RAWFile)[args.skip:]
7872     except:
7873         args.RAWFile.seek(args.skip)
7874         raw = memoryview(args.RAWFile.read())
7875         args.RAWFile.close()
7876     oid_maps = (
7877         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7878         if args.oids else ()
7879     )
7880     from functools import partial
7881     if args.schema:
7882         schema = obj_by_path(args.schema)
7883         pprinter = partial(pprint, big_blobs=True)
7884     else:
7885         schema, pprinter = generic_decoder()
7886     ctx = {
7887         "bered": not args.nobered,
7888         "allow_expl_oob": args.allow_expl_oob,
7889     }
7890     if args.defines_by_path is not None:
7891         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7892     if args.browse:
7893         obj, _ = schema().decode(raw, ctx=ctx)
7894         browse(raw, obj, oid_maps)
7895         from sys import exit as sys_exit
7896         sys_exit(0)
7897     from os import environ
7898     pprinter = partial(
7899         pprinter,
7900         oid_maps=oid_maps,
7901         with_colours=environ.get("NO_COLOR") is None,
7902         with_decode_path=args.print_decode_path,
7903         decode_path_only=(
7904             () if args.decode_path_only is None else
7905             tuple(args.decode_path_only.split(":"))
7906         ),
7907     )
7908     if args.evgen:
7909         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7910             print(pprinter(obj, decode_path=decode_path))
7911     else:
7912         obj, tail = schema().decode(raw, ctx=ctx)
7913         print(pprinter(obj))
7914     if tail != b"":
7915         print("\nTrailing data: %s" % hexenc(tail))
7916
7917
7918 if __name__ == "__main__":
7919     from pyderasn import *
7920     main()