]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Mention --browse
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 .. _browser:
905
906 ASN.1 browser
907 -------------
908 .. autofunction:: pyderasn.browse
909
910 Base Obj
911 --------
912 .. autoclass:: pyderasn.Obj
913    :members:
914
915 Primitive types
916 ---------------
917
918 Boolean
919 _______
920 .. autoclass:: pyderasn.Boolean
921    :members: __init__
922
923 Integer
924 _______
925 .. autoclass:: pyderasn.Integer
926    :members: __init__, named, tohex
927
928 BitString
929 _________
930 .. autoclass:: pyderasn.BitString
931    :members: __init__, bit_len, named
932
933 OctetString
934 ___________
935 .. autoclass:: pyderasn.OctetString
936    :members: __init__
937
938 Null
939 ____
940 .. autoclass:: pyderasn.Null
941    :members: __init__
942
943 ObjectIdentifier
944 ________________
945 .. autoclass:: pyderasn.ObjectIdentifier
946    :members: __init__
947
948 Enumerated
949 __________
950 .. autoclass:: pyderasn.Enumerated
951
952 CommonString
953 ____________
954 .. autoclass:: pyderasn.CommonString
955
956 NumericString
957 _____________
958 .. autoclass:: pyderasn.NumericString
959
960 PrintableString
961 _______________
962 .. autoclass:: pyderasn.PrintableString
963    :members: __init__, allow_asterisk, allow_ampersand
964
965 UTCTime
966 _______
967 .. autoclass:: pyderasn.UTCTime
968    :members: __init__, todatetime
969
970 GeneralizedTime
971 _______________
972 .. autoclass:: pyderasn.GeneralizedTime
973    :members: __init__, todatetime
974
975 Special types
976 -------------
977
978 Choice
979 ______
980 .. autoclass:: pyderasn.Choice
981    :members: __init__, choice, value
982
983 PrimitiveTypes
984 ______________
985 .. autoclass:: PrimitiveTypes
986
987 Any
988 ___
989 .. autoclass:: pyderasn.Any
990    :members: __init__
991
992 Constructed types
993 -----------------
994
995 Sequence
996 ________
997 .. autoclass:: pyderasn.Sequence
998    :members: __init__
999
1000 Set
1001 ___
1002 .. autoclass:: pyderasn.Set
1003    :members: __init__
1004
1005 SequenceOf
1006 __________
1007 .. autoclass:: pyderasn.SequenceOf
1008    :members: __init__
1009
1010 SetOf
1011 _____
1012 .. autoclass:: pyderasn.SetOf
1013    :members: __init__
1014
1015 Various
1016 -------
1017
1018 .. autofunction:: pyderasn.abs_decode_path
1019 .. autofunction:: pyderasn.agg_octet_string
1020 .. autofunction:: pyderasn.ascii_visualize
1021 .. autofunction:: pyderasn.colonize_hex
1022 .. autofunction:: pyderasn.encode2pass
1023 .. autofunction:: pyderasn.encode_cer
1024 .. autofunction:: pyderasn.file_mmaped
1025 .. autofunction:: pyderasn.hexenc
1026 .. autofunction:: pyderasn.hexdec
1027 .. autofunction:: pyderasn.hexdump
1028 .. autofunction:: pyderasn.tag_encode
1029 .. autofunction:: pyderasn.tag_decode
1030 .. autofunction:: pyderasn.tag_ctxp
1031 .. autofunction:: pyderasn.tag_ctxc
1032 .. autoclass:: pyderasn.DecodeError
1033    :members: __init__
1034 .. autoclass:: pyderasn.NotEnoughData
1035 .. autoclass:: pyderasn.ExceedingData
1036 .. autoclass:: pyderasn.LenIndefForm
1037 .. autoclass:: pyderasn.TagMismatch
1038 .. autoclass:: pyderasn.InvalidLength
1039 .. autoclass:: pyderasn.InvalidOID
1040 .. autoclass:: pyderasn.ObjUnknown
1041 .. autoclass:: pyderasn.ObjNotReady
1042 .. autoclass:: pyderasn.InvalidValueType
1043 .. autoclass:: pyderasn.BoundsError
1044
1045 .. _cmdline:
1046
1047 Command-line usage
1048 ------------------
1049
1050 You can decode DER/BER files using command line abilities::
1051
1052     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1053
1054 If there is no schema for your file, then you can try parsing it without,
1055 but of course IMPLICIT tags will often make it impossible. But result is
1056 good enough for the certificate above::
1057
1058     $ python -m pyderasn path/to/file
1059         0   [1,3,1604]  . >: SEQUENCE OF
1060         4   [1,3,1453]  . . >: SEQUENCE OF
1061         8   [0,0,   5]  . . . . >: [0] ANY
1062                         . . . . . A0:03:02:01:02
1063        13   [1,1,   3]  . . . . >: INTEGER 61595
1064        18   [1,1,  13]  . . . . >: SEQUENCE OF
1065        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1066        31   [1,1,   0]  . . . . . . >: NULL
1067        33   [1,3, 274]  . . . . >: SEQUENCE OF
1068        37   [1,1,  11]  . . . . . . >: SET OF
1069        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1070        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1071        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1072     [...]
1073      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1074      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1075      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1076                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1077                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1078                         . . . . . . . . . 61:2E:63:6F:6D:2F
1079      1461   [1,1,  13]  . . >: SEQUENCE OF
1080      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1081      1474   [1,1,   0]  . . . . >: NULL
1082      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1083                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1084                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1085     [...]
1086
1087 Human readable OIDs
1088 ___________________
1089
1090 If you have got dictionaries with ObjectIdentifiers, like example one
1091 from ``tests/test_crts.py``::
1092
1093     stroid2name = {
1094         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1095         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1096         [...]
1097         "2.5.4.10": "id-at-organizationName",
1098         "2.5.4.11": "id-at-organizationalUnitName",
1099     }
1100
1101 then you can pass it to pretty printer to see human readable OIDs::
1102
1103     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1104     [...]
1105        37   [1,1,  11]  . . . . . . >: SET OF
1106        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1107        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1108        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1109        50   [1,1,  18]  . . . . . . >: SET OF
1110        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1111        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1112        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1113        70   [1,1,  18]  . . . . . . >: SET OF
1114        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1115        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1116        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1117     [...]
1118
1119 Decode paths
1120 ____________
1121
1122 Each decoded element has so-called decode path: sequence of structure
1123 names it is passing during the decode process. Each element has its own
1124 unique path inside the whole ASN.1 tree. You can print it out with
1125 ``--print-decode-path`` option::
1126
1127     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1128        0    [1,3,1604]  Certificate SEQUENCE []
1129        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1130       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1131       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1132       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1133       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1134       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1135                          . . . . 05:00
1136       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1137       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1138       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1139       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1140       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1141       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1142                          . . . . . . . 13:02:45:53
1143       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1144     [...]
1145
1146 Now you can print only the specified tree, for example signature algorithm::
1147
1148     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1149       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1150       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1151       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1152                          . . 05:00
1153 """
1154
1155 from array import array
1156 from codecs import getdecoder
1157 from codecs import getencoder
1158 from collections import namedtuple
1159 from collections import OrderedDict
1160 from copy import copy
1161 from datetime import datetime
1162 from datetime import timedelta
1163 from io import BytesIO
1164 from math import ceil
1165 from mmap import mmap
1166 from mmap import PROT_READ
1167 from operator import attrgetter
1168 from string import ascii_letters
1169 from string import digits
1170 from sys import maxsize as sys_maxsize
1171 from sys import version_info
1172 from unicodedata import category as unicat
1173
1174 from six import add_metaclass
1175 from six import binary_type
1176 from six import byte2int
1177 from six import indexbytes
1178 from six import int2byte
1179 from six import integer_types
1180 from six import iterbytes
1181 from six import iteritems
1182 from six import itervalues
1183 from six import PY2
1184 from six import string_types
1185 from six import text_type
1186 from six import unichr as six_unichr
1187 from six.moves import xrange as six_xrange
1188
1189
1190 try:
1191     from termcolor import colored
1192 except ImportError:  # pragma: no cover
1193     def colored(what, *args, **kwargs):
1194         return what
1195
1196 __version__ = "7.4"
1197
1198 __all__ = (
1199     "agg_octet_string",
1200     "Any",
1201     "BitString",
1202     "BMPString",
1203     "Boolean",
1204     "BoundsError",
1205     "Choice",
1206     "colonize_hex",
1207     "DecodeError",
1208     "DecodePathDefBy",
1209     "encode2pass",
1210     "encode_cer",
1211     "Enumerated",
1212     "ExceedingData",
1213     "file_mmaped",
1214     "GeneralizedTime",
1215     "GeneralString",
1216     "GraphicString",
1217     "hexdec",
1218     "hexenc",
1219     "IA5String",
1220     "Integer",
1221     "InvalidLength",
1222     "InvalidOID",
1223     "InvalidValueType",
1224     "ISO646String",
1225     "LenIndefForm",
1226     "NotEnoughData",
1227     "Null",
1228     "NumericString",
1229     "obj_by_path",
1230     "ObjectIdentifier",
1231     "ObjNotReady",
1232     "ObjUnknown",
1233     "OctetString",
1234     "PrimitiveTypes",
1235     "PrintableString",
1236     "Sequence",
1237     "SequenceOf",
1238     "Set",
1239     "SetOf",
1240     "T61String",
1241     "tag_ctxc",
1242     "tag_ctxp",
1243     "tag_decode",
1244     "TagClassApplication",
1245     "TagClassContext",
1246     "TagClassPrivate",
1247     "TagClassUniversal",
1248     "TagFormConstructed",
1249     "TagFormPrimitive",
1250     "TagMismatch",
1251     "TeletexString",
1252     "UniversalString",
1253     "UTCTime",
1254     "UTF8String",
1255     "VideotexString",
1256     "VisibleString",
1257 )
1258
1259 TagClassUniversal = 0
1260 TagClassApplication = 1 << 6
1261 TagClassContext = 1 << 7
1262 TagClassPrivate = 1 << 6 | 1 << 7
1263 TagFormPrimitive = 0
1264 TagFormConstructed = 1 << 5
1265 TagClassReprs = {
1266     TagClassContext: "",
1267     TagClassApplication: "APPLICATION ",
1268     TagClassPrivate: "PRIVATE ",
1269     TagClassUniversal: "UNIV ",
1270 }
1271 EOC = b"\x00\x00"
1272 EOC_LEN = len(EOC)
1273 LENINDEF = b"\x80"  # length indefinite mark
1274 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1275 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1276 SET01 = frozenset("01")
1277 DECIMALS = frozenset(digits)
1278 DECIMAL_SIGNS = ".,"
1279 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1280
1281
1282 def file_mmaped(fd):
1283     """Make mmap-ed memoryview for reading from file
1284
1285     :param fd: file object
1286     :returns: memoryview over read-only mmap-ing of the whole file
1287     """
1288     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1289
1290 def pureint(value):
1291     if not set(value) <= DECIMALS:
1292         raise ValueError("non-pure integer")
1293     return int(value)
1294
1295 def fractions2float(fractions_raw):
1296     pureint(fractions_raw)
1297     return float("0." + fractions_raw)
1298
1299
1300 def get_def_by_path(defines_by_path, sub_decode_path):
1301     """Get define by decode path
1302     """
1303     for path, define in defines_by_path:
1304         if len(path) != len(sub_decode_path):
1305             continue
1306         for p1, p2 in zip(path, sub_decode_path):
1307             if (not p1 is any) and (p1 != p2):
1308                 break
1309         else:
1310             return define
1311
1312
1313 ########################################################################
1314 # Errors
1315 ########################################################################
1316
1317 class ASN1Error(ValueError):
1318     pass
1319
1320
1321 class DecodeError(ASN1Error):
1322     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1323         """
1324         :param str msg: reason of decode failing
1325         :param klass: optional exact DecodeError inherited class (like
1326                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1327                       :py:exc:`InvalidLength`)
1328         :param decode_path: tuple of strings. It contains human
1329                             readable names of the fields through which
1330                             decoding process has passed
1331         :param int offset: binary offset where failure happened
1332         """
1333         super(DecodeError, self).__init__()
1334         self.msg = msg
1335         self.klass = klass
1336         self.decode_path = decode_path
1337         self.offset = offset
1338
1339     def __str__(self):
1340         return " ".join(
1341             c for c in (
1342                 "" if self.klass is None else self.klass.__name__,
1343                 (
1344                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1345                     if len(self.decode_path) > 0 else ""
1346                 ),
1347                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1348                 self.msg,
1349             ) if c != ""
1350         )
1351
1352     def __repr__(self):
1353         return "%s(%s)" % (self.__class__.__name__, self)
1354
1355
1356 class NotEnoughData(DecodeError):
1357     pass
1358
1359
1360 class ExceedingData(ASN1Error):
1361     def __init__(self, nbytes):
1362         super(ExceedingData, self).__init__()
1363         self.nbytes = nbytes
1364
1365     def __str__(self):
1366         return "%d trailing bytes" % self.nbytes
1367
1368     def __repr__(self):
1369         return "%s(%s)" % (self.__class__.__name__, self)
1370
1371
1372 class LenIndefForm(DecodeError):
1373     pass
1374
1375
1376 class TagMismatch(DecodeError):
1377     pass
1378
1379
1380 class InvalidLength(DecodeError):
1381     pass
1382
1383
1384 class InvalidOID(DecodeError):
1385     pass
1386
1387
1388 class ObjUnknown(ASN1Error):
1389     def __init__(self, name):
1390         super(ObjUnknown, self).__init__()
1391         self.name = name
1392
1393     def __str__(self):
1394         return "object is unknown: %s" % self.name
1395
1396     def __repr__(self):
1397         return "%s(%s)" % (self.__class__.__name__, self)
1398
1399
1400 class ObjNotReady(ASN1Error):
1401     def __init__(self, name):
1402         super(ObjNotReady, self).__init__()
1403         self.name = name
1404
1405     def __str__(self):
1406         return "object is not ready: %s" % self.name
1407
1408     def __repr__(self):
1409         return "%s(%s)" % (self.__class__.__name__, self)
1410
1411
1412 class InvalidValueType(ASN1Error):
1413     def __init__(self, expected_types):
1414         super(InvalidValueType, self).__init__()
1415         self.expected_types = expected_types
1416
1417     def __str__(self):
1418         return "invalid value type, expected: %s" % ", ".join(
1419             [repr(t) for t in self.expected_types]
1420         )
1421
1422     def __repr__(self):
1423         return "%s(%s)" % (self.__class__.__name__, self)
1424
1425
1426 class BoundsError(ASN1Error):
1427     def __init__(self, bound_min, value, bound_max):
1428         super(BoundsError, self).__init__()
1429         self.bound_min = bound_min
1430         self.value = value
1431         self.bound_max = bound_max
1432
1433     def __str__(self):
1434         return "unsatisfied bounds: %s <= %s <= %s" % (
1435             self.bound_min,
1436             self.value,
1437             self.bound_max,
1438         )
1439
1440     def __repr__(self):
1441         return "%s(%s)" % (self.__class__.__name__, self)
1442
1443
1444 ########################################################################
1445 # Basic coders
1446 ########################################################################
1447
1448 _hexdecoder = getdecoder("hex")
1449 _hexencoder = getencoder("hex")
1450
1451
1452 def hexdec(data):
1453     """Binary data to hexadecimal string convert
1454     """
1455     return _hexdecoder(data)[0]
1456
1457
1458 def hexenc(data):
1459     """Hexadecimal string to binary data convert
1460     """
1461     return _hexencoder(data)[0].decode("ascii")
1462
1463
1464 def int_bytes_len(num, byte_len=8):
1465     if num == 0:
1466         return 1
1467     return int(ceil(float(num.bit_length()) / byte_len))
1468
1469
1470 def zero_ended_encode(num):
1471     octets = bytearray(int_bytes_len(num, 7))
1472     i = len(octets) - 1
1473     octets[i] = num & 0x7F
1474     num >>= 7
1475     i -= 1
1476     while num > 0:
1477         octets[i] = 0x80 | (num & 0x7F)
1478         num >>= 7
1479         i -= 1
1480     return bytes(octets)
1481
1482
1483 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1484     """Encode tag to binary form
1485
1486     :param int num: tag's number
1487     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1488                       :py:data:`pyderasn.TagClassContext`,
1489                       :py:data:`pyderasn.TagClassApplication`,
1490                       :py:data:`pyderasn.TagClassPrivate`)
1491     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1492                      :py:data:`pyderasn.TagFormConstructed`)
1493     """
1494     if num < 31:
1495         # [XX|X|.....]
1496         return int2byte(klass | form | num)
1497     # [XX|X|11111][1.......][1.......] ... [0.......]
1498     return int2byte(klass | form | 31) + zero_ended_encode(num)
1499
1500
1501 def tag_decode(tag):
1502     """Decode tag from binary form
1503
1504     .. warning::
1505
1506        No validation is performed, assuming that it has already passed.
1507
1508     It returns tuple with three integers, as
1509     :py:func:`pyderasn.tag_encode` accepts.
1510     """
1511     first_octet = byte2int(tag)
1512     klass = first_octet & 0xC0
1513     form = first_octet & 0x20
1514     if first_octet & 0x1F < 0x1F:
1515         return (klass, form, first_octet & 0x1F)
1516     num = 0
1517     for octet in iterbytes(tag[1:]):
1518         num <<= 7
1519         num |= octet & 0x7F
1520     return (klass, form, num)
1521
1522
1523 def tag_ctxp(num):
1524     """Create CONTEXT PRIMITIVE tag
1525     """
1526     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1527
1528
1529 def tag_ctxc(num):
1530     """Create CONTEXT CONSTRUCTED tag
1531     """
1532     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1533
1534
1535 def tag_strip(data):
1536     """Take off tag from the data
1537
1538     :returns: (encoded tag, tag length, remaining data)
1539     """
1540     if len(data) == 0:
1541         raise NotEnoughData("no data at all")
1542     if byte2int(data) & 0x1F < 31:
1543         return data[:1], 1, data[1:]
1544     i = 0
1545     while True:
1546         i += 1
1547         if i == len(data):
1548             raise DecodeError("unfinished tag")
1549         if indexbytes(data, i) & 0x80 == 0:
1550             break
1551     i += 1
1552     return data[:i], i, data[i:]
1553
1554
1555 def len_encode(l):
1556     if l < 0x80:
1557         return int2byte(l)
1558     octets = bytearray(int_bytes_len(l) + 1)
1559     octets[0] = 0x80 | (len(octets) - 1)
1560     for i in six_xrange(len(octets) - 1, 0, -1):
1561         octets[i] = l & 0xFF
1562         l >>= 8
1563     return bytes(octets)
1564
1565
1566 def len_decode(data):
1567     """Decode length
1568
1569     :returns: (decoded length, length's length, remaining data)
1570     :raises LenIndefForm: if indefinite form encoding is met
1571     """
1572     if len(data) == 0:
1573         raise NotEnoughData("no data at all")
1574     first_octet = byte2int(data)
1575     if first_octet & 0x80 == 0:
1576         return first_octet, 1, data[1:]
1577     octets_num = first_octet & 0x7F
1578     if octets_num + 1 > len(data):
1579         raise NotEnoughData("encoded length is longer than data")
1580     if octets_num == 0:
1581         raise LenIndefForm()
1582     if byte2int(data[1:]) == 0:
1583         raise DecodeError("leading zeros")
1584     l = 0
1585     for v in iterbytes(data[1:1 + octets_num]):
1586         l = (l << 8) | v
1587     if l <= 127:
1588         raise DecodeError("long form instead of short one")
1589     return l, 1 + octets_num, data[1 + octets_num:]
1590
1591
1592 LEN0 = len_encode(0)
1593 LEN1 = len_encode(1)
1594 LEN1K = len_encode(1000)
1595
1596
1597 def len_size(l):
1598     """How many bytes length field will take
1599     """
1600     if l < 128:
1601         return 1
1602     if l < 256:  # 1 << 8
1603         return 2
1604     if l < 65536:  # 1 << 16
1605         return 3
1606     if l < 16777216:  # 1 << 24
1607         return 4
1608     if l < 4294967296:  # 1 << 32
1609         return 5
1610     if l < 1099511627776:  # 1 << 40
1611         return 6
1612     if l < 281474976710656:  # 1 << 48
1613         return 7
1614     if l < 72057594037927936:  # 1 << 56
1615         return 8
1616     raise OverflowError("too big length")
1617
1618
1619 def write_full(writer, data):
1620     """Fully write provided data
1621
1622     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1623
1624     BytesIO does not guarantee that the whole data will be written at
1625     once. That function write everything provided, raising an error if
1626     ``writer`` returns None.
1627     """
1628     data = memoryview(data)
1629     written = 0
1630     while written != len(data):
1631         n = writer(data[written:])
1632         if n is None:
1633             raise ValueError("can not write to buf")
1634         written += n
1635
1636
1637 # If it is 64-bit system, then use compact 64-bit array of unsigned
1638 # longs. Use an ordinary list with universal integers otherwise, that
1639 # is slower.
1640 if sys_maxsize > 2 ** 32:
1641     def state_2pass_new():
1642         return array("L")
1643 else:
1644     def state_2pass_new():
1645         return []
1646
1647
1648 ########################################################################
1649 # Base class
1650 ########################################################################
1651
1652 class AutoAddSlots(type):
1653     def __new__(cls, name, bases, _dict):
1654         _dict["__slots__"] = _dict.get("__slots__", ())
1655         return type.__new__(cls, name, bases, _dict)
1656
1657
1658 BasicState = namedtuple("BasicState", (
1659     "version",
1660     "tag",
1661     "tag_order",
1662     "expl",
1663     "default",
1664     "optional",
1665     "offset",
1666     "llen",
1667     "vlen",
1668     "expl_lenindef",
1669     "lenindef",
1670     "ber_encoded",
1671 ), **NAMEDTUPLE_KWARGS)
1672
1673
1674 @add_metaclass(AutoAddSlots)
1675 class Obj(object):
1676     """Common ASN.1 object class
1677
1678     All ASN.1 types are inherited from it. It has metaclass that
1679     automatically adds ``__slots__`` to all inherited classes.
1680     """
1681     __slots__ = (
1682         "tag",
1683         "_tag_order",
1684         "_value",
1685         "_expl",
1686         "default",
1687         "optional",
1688         "offset",
1689         "llen",
1690         "vlen",
1691         "expl_lenindef",
1692         "lenindef",
1693         "ber_encoded",
1694     )
1695
1696     def __init__(
1697             self,
1698             impl=None,
1699             expl=None,
1700             default=None,
1701             optional=False,
1702             _decoded=(0, 0, 0),
1703     ):
1704         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1705         self._expl = getattr(self, "expl", None) if expl is None else expl
1706         if self.tag != self.tag_default and self._expl is not None:
1707             raise ValueError("implicit and explicit tags can not be set simultaneously")
1708         if self.tag is None:
1709             self._tag_order = None
1710         else:
1711             tag_class, _, tag_num = tag_decode(
1712                 self.tag if self._expl is None else self._expl
1713             )
1714             self._tag_order = (tag_class, tag_num)
1715         if default is not None:
1716             optional = True
1717         self.optional = optional
1718         self.offset, self.llen, self.vlen = _decoded
1719         self.default = None
1720         self.expl_lenindef = False
1721         self.lenindef = False
1722         self.ber_encoded = False
1723
1724     @property
1725     def ready(self):  # pragma: no cover
1726         """Is object ready to be encoded?
1727         """
1728         raise NotImplementedError()
1729
1730     def _assert_ready(self):
1731         if not self.ready:
1732             raise ObjNotReady(self.__class__.__name__)
1733
1734     @property
1735     def bered(self):
1736         """Is either object or any elements inside is BER encoded?
1737         """
1738         return self.expl_lenindef or self.lenindef or self.ber_encoded
1739
1740     @property
1741     def decoded(self):
1742         """Is object decoded?
1743         """
1744         return (self.llen + self.vlen) > 0
1745
1746     def __getstate__(self):  # pragma: no cover
1747         """Used for making safe to be mutable pickleable copies
1748         """
1749         raise NotImplementedError()
1750
1751     def __setstate__(self, state):
1752         if state.version != __version__:
1753             raise ValueError("data is pickled by different PyDERASN version")
1754         self.tag = state.tag
1755         self._tag_order = state.tag_order
1756         self._expl = state.expl
1757         self.default = state.default
1758         self.optional = state.optional
1759         self.offset = state.offset
1760         self.llen = state.llen
1761         self.vlen = state.vlen
1762         self.expl_lenindef = state.expl_lenindef
1763         self.lenindef = state.lenindef
1764         self.ber_encoded = state.ber_encoded
1765
1766     @property
1767     def tag_order(self):
1768         """Tag's (class, number) used for DER/CER sorting
1769         """
1770         return self._tag_order
1771
1772     @property
1773     def tag_order_cer(self):
1774         return self.tag_order
1775
1776     @property
1777     def tlen(self):
1778         """.. seealso:: :ref:`decoding`
1779         """
1780         return len(self.tag)
1781
1782     @property
1783     def tlvlen(self):
1784         """.. seealso:: :ref:`decoding`
1785         """
1786         return self.tlen + self.llen + self.vlen
1787
1788     def __str__(self):  # pragma: no cover
1789         return self.__bytes__() if PY2 else self.__unicode__()
1790
1791     def __ne__(self, their):
1792         return not(self == their)
1793
1794     def __gt__(self, their):  # pragma: no cover
1795         return not(self < their)
1796
1797     def __le__(self, their):  # pragma: no cover
1798         return (self == their) or (self < their)
1799
1800     def __ge__(self, their):  # pragma: no cover
1801         return (self == their) or (self > their)
1802
1803     def _encode(self):  # pragma: no cover
1804         raise NotImplementedError()
1805
1806     def _encode_cer(self, writer):
1807         write_full(writer, self._encode())
1808
1809     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1810         yield NotImplemented
1811
1812     def _encode1st(self, state):
1813         raise NotImplementedError()
1814
1815     def _encode2nd(self, writer, state_iter):
1816         raise NotImplementedError()
1817
1818     def encode(self):
1819         """DER encode the structure
1820
1821         :returns: DER representation
1822         """
1823         raw = self._encode()
1824         if self._expl is None:
1825             return raw
1826         return b"".join((self._expl, len_encode(len(raw)), raw))
1827
1828     def encode1st(self, state=None):
1829         """Do the 1st pass of 2-pass encoding
1830
1831         :rtype: (int, array("L"))
1832         :returns: full length of encoded data and precalculated various
1833                   objects lengths
1834         """
1835         if state is None:
1836             state = state_2pass_new()
1837         if self._expl is None:
1838             return self._encode1st(state)
1839         state.append(0)
1840         idx = len(state) - 1
1841         vlen, _ = self._encode1st(state)
1842         state[idx] = vlen
1843         fulllen = len(self._expl) + len_size(vlen) + vlen
1844         return fulllen, state
1845
1846     def encode2nd(self, writer, state_iter):
1847         """Do the 2nd pass of 2-pass encoding
1848
1849         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1850         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1851         """
1852         if self._expl is None:
1853             self._encode2nd(writer, state_iter)
1854         else:
1855             write_full(writer, self._expl + len_encode(next(state_iter)))
1856             self._encode2nd(writer, state_iter)
1857
1858     def encode_cer(self, writer):
1859         """CER encode the structure to specified writer
1860
1861         :param writer: must comply with ``io.RawIOBase.write``
1862                        behaviour. It takes slice to be written and
1863                        returns number of bytes processed. If it returns
1864                        None, then exception will be raised
1865         """
1866         if self._expl is not None:
1867             write_full(writer, self._expl + LENINDEF)
1868         if getattr(self, "der_forced", False):
1869             write_full(writer, self._encode())
1870         else:
1871             self._encode_cer(writer)
1872         if self._expl is not None:
1873             write_full(writer, EOC)
1874
1875     def hexencode(self):
1876         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1877         """
1878         return hexenc(self.encode())
1879
1880     def decode(
1881             self,
1882             data,
1883             offset=0,
1884             leavemm=False,
1885             decode_path=(),
1886             ctx=None,
1887             tag_only=False,
1888             _ctx_immutable=True,
1889     ):
1890         """Decode the data
1891
1892         :param data: either binary or memoryview
1893         :param int offset: initial data's offset
1894         :param bool leavemm: do we need to leave memoryview of remaining
1895                     data as is, or convert it to bytes otherwise
1896         :param decode_path: current decode path (tuples of strings,
1897                             possibly with DecodePathDefBy) with will be
1898                             the root for all underlying objects
1899         :param ctx: optional :ref:`context <ctx>` governing decoding process
1900         :param bool tag_only: decode only the tag, without length and
1901                               contents (used only in Choice and Set
1902                               structures, trying to determine if tag satisfies
1903                               the schema)
1904         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1905                                     before using it?
1906         :returns: (Obj, remaining data)
1907
1908         .. seealso:: :ref:`decoding`
1909         """
1910         result = next(self.decode_evgen(
1911             data,
1912             offset,
1913             leavemm,
1914             decode_path,
1915             ctx,
1916             tag_only,
1917             _ctx_immutable,
1918             _evgen_mode=False,
1919         ))
1920         if result is None:
1921             return None
1922         _, obj, tail = result
1923         return obj, tail
1924
1925     def decode_evgen(
1926             self,
1927             data,
1928             offset=0,
1929             leavemm=False,
1930             decode_path=(),
1931             ctx=None,
1932             tag_only=False,
1933             _ctx_immutable=True,
1934             _evgen_mode=True,
1935     ):
1936         """Decode with evgen mode on
1937
1938         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1939         it returns the generator producing ``(decode_path, obj, tail)``
1940         values.
1941         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1942         """
1943         if ctx is None:
1944             ctx = {}
1945         elif _ctx_immutable:
1946             ctx = copy(ctx)
1947         tlv = memoryview(data)
1948         if (
1949                 _evgen_mode and
1950                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1951         ):
1952             _evgen_mode = False
1953         if self._expl is None:
1954             for result in self._decode(
1955                     tlv,
1956                     offset=offset,
1957                     decode_path=decode_path,
1958                     ctx=ctx,
1959                     tag_only=tag_only,
1960                     evgen_mode=_evgen_mode,
1961             ):
1962                 if tag_only:
1963                     yield None
1964                     return
1965                 _decode_path, obj, tail = result
1966                 if not _decode_path is decode_path:
1967                     yield result
1968         else:
1969             try:
1970                 t, tlen, lv = tag_strip(tlv)
1971             except DecodeError as err:
1972                 raise err.__class__(
1973                     msg=err.msg,
1974                     klass=self.__class__,
1975                     decode_path=decode_path,
1976                     offset=offset,
1977                 )
1978             if t != self._expl:
1979                 raise TagMismatch(
1980                     klass=self.__class__,
1981                     decode_path=decode_path,
1982                     offset=offset,
1983                 )
1984             try:
1985                 l, llen, v = len_decode(lv)
1986             except LenIndefForm as err:
1987                 if not ctx.get("bered", False):
1988                     raise err.__class__(
1989                         msg=err.msg,
1990                         klass=self.__class__,
1991                         decode_path=decode_path,
1992                         offset=offset,
1993                     )
1994                 llen, v = 1, lv[1:]
1995                 offset += tlen + llen
1996                 for result in self._decode(
1997                         v,
1998                         offset=offset,
1999                         decode_path=decode_path,
2000                         ctx=ctx,
2001                         tag_only=tag_only,
2002                         evgen_mode=_evgen_mode,
2003                 ):
2004                     if tag_only:  # pragma: no cover
2005                         yield None
2006                         return
2007                     _decode_path, obj, tail = result
2008                     if not _decode_path is decode_path:
2009                         yield result
2010                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2011                 if eoc_expected.tobytes() != EOC:
2012                     raise DecodeError(
2013                         "no EOC",
2014                         klass=self.__class__,
2015                         decode_path=decode_path,
2016                         offset=offset,
2017                     )
2018                 obj.vlen += EOC_LEN
2019                 obj.expl_lenindef = True
2020             except DecodeError as err:
2021                 raise err.__class__(
2022                     msg=err.msg,
2023                     klass=self.__class__,
2024                     decode_path=decode_path,
2025                     offset=offset,
2026                 )
2027             else:
2028                 if l > len(v):
2029                     raise NotEnoughData(
2030                         "encoded length is longer than data",
2031                         klass=self.__class__,
2032                         decode_path=decode_path,
2033                         offset=offset,
2034                     )
2035                 for result in self._decode(
2036                         v,
2037                         offset=offset + tlen + llen,
2038                         decode_path=decode_path,
2039                         ctx=ctx,
2040                         tag_only=tag_only,
2041                         evgen_mode=_evgen_mode,
2042                 ):
2043                     if tag_only:  # pragma: no cover
2044                         yield None
2045                         return
2046                     _decode_path, obj, tail = result
2047                     if not _decode_path is decode_path:
2048                         yield result
2049                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2050                     raise DecodeError(
2051                         "explicit tag out-of-bound, longer than data",
2052                         klass=self.__class__,
2053                         decode_path=decode_path,
2054                         offset=offset,
2055                     )
2056         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2057
2058     def decod(self, data, offset=0, decode_path=(), ctx=None):
2059         """Decode the data, check that tail is empty
2060
2061         :raises ExceedingData: if tail is not empty
2062
2063         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2064         (decode without tail) that also checks that there is no
2065         trailing data left.
2066         """
2067         obj, tail = self.decode(
2068             data,
2069             offset=offset,
2070             decode_path=decode_path,
2071             ctx=ctx,
2072             leavemm=True,
2073         )
2074         if len(tail) > 0:
2075             raise ExceedingData(len(tail))
2076         return obj
2077
2078     def hexdecode(self, data, *args, **kwargs):
2079         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2080         """
2081         return self.decode(hexdec(data), *args, **kwargs)
2082
2083     def hexdecod(self, data, *args, **kwargs):
2084         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2085         """
2086         return self.decod(hexdec(data), *args, **kwargs)
2087
2088     @property
2089     def expled(self):
2090         """.. seealso:: :ref:`decoding`
2091         """
2092         return self._expl is not None
2093
2094     @property
2095     def expl_tag(self):
2096         """.. seealso:: :ref:`decoding`
2097         """
2098         return self._expl
2099
2100     @property
2101     def expl_tlen(self):
2102         """.. seealso:: :ref:`decoding`
2103         """
2104         return len(self._expl)
2105
2106     @property
2107     def expl_llen(self):
2108         """.. seealso:: :ref:`decoding`
2109         """
2110         if self.expl_lenindef:
2111             return 1
2112         return len(len_encode(self.tlvlen))
2113
2114     @property
2115     def expl_offset(self):
2116         """.. seealso:: :ref:`decoding`
2117         """
2118         return self.offset - self.expl_tlen - self.expl_llen
2119
2120     @property
2121     def expl_vlen(self):
2122         """.. seealso:: :ref:`decoding`
2123         """
2124         return self.tlvlen
2125
2126     @property
2127     def expl_tlvlen(self):
2128         """.. seealso:: :ref:`decoding`
2129         """
2130         return self.expl_tlen + self.expl_llen + self.expl_vlen
2131
2132     @property
2133     def fulloffset(self):
2134         """.. seealso:: :ref:`decoding`
2135         """
2136         return self.expl_offset if self.expled else self.offset
2137
2138     @property
2139     def fulllen(self):
2140         """.. seealso:: :ref:`decoding`
2141         """
2142         return self.expl_tlvlen if self.expled else self.tlvlen
2143
2144     def pps_lenindef(self, decode_path):
2145         if self.lenindef and not (
2146                 getattr(self, "defined", None) is not None and
2147                 self.defined[1].lenindef
2148         ):
2149             yield _pp(
2150                 asn1_type_name="EOC",
2151                 obj_name="",
2152                 decode_path=decode_path,
2153                 offset=(
2154                     self.offset + self.tlvlen -
2155                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2156                 ),
2157                 tlen=1,
2158                 llen=1,
2159                 vlen=0,
2160                 ber_encoded=True,
2161                 bered=True,
2162             )
2163         if self.expl_lenindef:
2164             yield _pp(
2165                 asn1_type_name="EOC",
2166                 obj_name="EXPLICIT",
2167                 decode_path=decode_path,
2168                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2169                 tlen=1,
2170                 llen=1,
2171                 vlen=0,
2172                 ber_encoded=True,
2173                 bered=True,
2174             )
2175
2176
2177 def encode_cer(obj):
2178     """Encode to CER in memory buffer
2179
2180     :returns bytes: memory buffer contents
2181     """
2182     buf = BytesIO()
2183     obj.encode_cer(buf.write)
2184     return buf.getvalue()
2185
2186
2187 def encode2pass(obj):
2188     """Encode (2-pass mode) to DER in memory buffer
2189
2190     :returns bytes: memory buffer contents
2191     """
2192     buf = BytesIO()
2193     _, state = obj.encode1st()
2194     obj.encode2nd(buf.write, iter(state))
2195     return buf.getvalue()
2196
2197
2198 class DecodePathDefBy(object):
2199     """DEFINED BY representation inside decode path
2200     """
2201     __slots__ = ("defined_by",)
2202
2203     def __init__(self, defined_by):
2204         self.defined_by = defined_by
2205
2206     def __ne__(self, their):
2207         return not(self == their)
2208
2209     def __eq__(self, their):
2210         if not isinstance(their, self.__class__):
2211             return False
2212         return self.defined_by == their.defined_by
2213
2214     def __str__(self):
2215         return "DEFINED BY " + str(self.defined_by)
2216
2217     def __repr__(self):
2218         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2219
2220
2221 ########################################################################
2222 # Pretty printing
2223 ########################################################################
2224
2225 PP = namedtuple("PP", (
2226     "obj",
2227     "asn1_type_name",
2228     "obj_name",
2229     "decode_path",
2230     "value",
2231     "blob",
2232     "optional",
2233     "default",
2234     "impl",
2235     "expl",
2236     "offset",
2237     "tlen",
2238     "llen",
2239     "vlen",
2240     "expl_offset",
2241     "expl_tlen",
2242     "expl_llen",
2243     "expl_vlen",
2244     "expl_lenindef",
2245     "lenindef",
2246     "ber_encoded",
2247     "bered",
2248 ), **NAMEDTUPLE_KWARGS)
2249
2250
2251 def _pp(
2252         obj=None,
2253         asn1_type_name="unknown",
2254         obj_name="unknown",
2255         decode_path=(),
2256         value=None,
2257         blob=None,
2258         optional=False,
2259         default=False,
2260         impl=None,
2261         expl=None,
2262         offset=0,
2263         tlen=0,
2264         llen=0,
2265         vlen=0,
2266         expl_offset=None,
2267         expl_tlen=None,
2268         expl_llen=None,
2269         expl_vlen=None,
2270         expl_lenindef=False,
2271         lenindef=False,
2272         ber_encoded=False,
2273         bered=False,
2274 ):
2275     return PP(
2276         obj,
2277         asn1_type_name,
2278         obj_name,
2279         decode_path,
2280         value,
2281         blob,
2282         optional,
2283         default,
2284         impl,
2285         expl,
2286         offset,
2287         tlen,
2288         llen,
2289         vlen,
2290         expl_offset,
2291         expl_tlen,
2292         expl_llen,
2293         expl_vlen,
2294         expl_lenindef,
2295         lenindef,
2296         ber_encoded,
2297         bered,
2298     )
2299
2300
2301 def _colourize(what, colour, with_colours, attrs=("bold",)):
2302     return colored(what, colour, attrs=attrs) if with_colours else what
2303
2304
2305 def colonize_hex(hexed):
2306     """Separate hexadecimal string with colons
2307     """
2308     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2309
2310
2311 def find_oid_name(asn1_type_name, oid_maps, value):
2312     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2313         for oid_map in oid_maps:
2314             oid_name = oid_map.get(value)
2315             if oid_name is not None:
2316                 return oid_name
2317     return None
2318
2319
2320 def pp_console_row(
2321         pp,
2322         oid_maps=(),
2323         with_offsets=False,
2324         with_blob=True,
2325         with_colours=False,
2326         with_decode_path=False,
2327         decode_path_len_decrease=0,
2328 ):
2329     cols = []
2330     if with_offsets:
2331         col = "%5d%s%s" % (
2332             pp.offset,
2333             (
2334                 "  " if pp.expl_offset is None else
2335                 ("-%d" % (pp.offset - pp.expl_offset))
2336             ),
2337             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2338         )
2339         col = _colourize(col, "red", with_colours, ())
2340         col += _colourize("B", "red", with_colours) if pp.bered else " "
2341         cols.append(col)
2342         col = "[%d,%d,%4d]%s" % (
2343             pp.tlen, pp.llen, pp.vlen,
2344             LENINDEF_PP_CHAR if pp.lenindef else " "
2345         )
2346         col = _colourize(col, "green", with_colours, ())
2347         cols.append(col)
2348     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2349     if decode_path_len > 0:
2350         cols.append(" ." * decode_path_len)
2351         ent = pp.decode_path[-1]
2352         if isinstance(ent, DecodePathDefBy):
2353             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2354             value = str(ent.defined_by)
2355             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2356             if oid_name is None:
2357                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2358             else:
2359                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2360         else:
2361             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2362     if pp.expl is not None:
2363         klass, _, num = pp.expl
2364         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2365         cols.append(_colourize(col, "blue", with_colours))
2366     if pp.impl is not None:
2367         klass, _, num = pp.impl
2368         col = "[%s%d]" % (TagClassReprs[klass], num)
2369         cols.append(_colourize(col, "blue", with_colours))
2370     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2371         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2372     if pp.ber_encoded:
2373         cols.append(_colourize("BER", "red", with_colours))
2374     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2375     if pp.value is not None:
2376         value = pp.value
2377         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2378         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2379         if oid_name is not None:
2380             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2381         if pp.asn1_type_name == Integer.asn1_type_name:
2382             cols.append(_colourize(
2383                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2384             ))
2385     if with_blob:
2386         if pp.blob.__class__ == binary_type:
2387             cols.append(hexenc(pp.blob))
2388         elif pp.blob.__class__ == tuple:
2389             cols.append(", ".join(pp.blob))
2390     if pp.optional:
2391         cols.append(_colourize("OPTIONAL", "red", with_colours))
2392     if pp.default:
2393         cols.append(_colourize("DEFAULT", "red", with_colours))
2394     if with_decode_path:
2395         cols.append(_colourize(
2396             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2397             "grey",
2398             with_colours,
2399         ))
2400     return " ".join(cols)
2401
2402
2403 def pp_console_blob(pp, decode_path_len_decrease=0):
2404     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2405     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2406     if decode_path_len > 0:
2407         cols.append(" ." * (decode_path_len + 1))
2408     if pp.blob.__class__ == binary_type:
2409         blob = hexenc(pp.blob).upper()
2410         for i in six_xrange(0, len(blob), 32):
2411             chunk = blob[i:i + 32]
2412             yield " ".join(cols + [colonize_hex(chunk)])
2413     elif pp.blob.__class__ == tuple:
2414         yield " ".join(cols + [", ".join(pp.blob)])
2415
2416
2417 def pprint(
2418         obj,
2419         oid_maps=(),
2420         big_blobs=False,
2421         with_colours=False,
2422         with_decode_path=False,
2423         decode_path_only=(),
2424         decode_path=(),
2425 ):
2426     """Pretty print object
2427
2428     :param Obj obj: object you want to pretty print
2429     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2430                      Its human readable form is printed when OID is met
2431     :param big_blobs: if large binary objects are met (like OctetString
2432                       values), do we need to print them too, on separate
2433                       lines
2434     :param with_colours: colourize output, if ``termcolor`` library
2435                          is available
2436     :param with_decode_path: print decode path
2437     :param decode_path_only: print only that specified decode path
2438     """
2439     def _pprint_pps(pps):
2440         for pp in pps:
2441             if hasattr(pp, "_fields"):
2442                 if (
2443                         decode_path_only != () and
2444                         tuple(
2445                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2446                         ) != decode_path_only
2447                 ):
2448                     continue
2449                 if big_blobs:
2450                     yield pp_console_row(
2451                         pp,
2452                         oid_maps=oid_maps,
2453                         with_offsets=True,
2454                         with_blob=False,
2455                         with_colours=with_colours,
2456                         with_decode_path=with_decode_path,
2457                         decode_path_len_decrease=len(decode_path_only),
2458                     )
2459                     for row in pp_console_blob(
2460                             pp,
2461                             decode_path_len_decrease=len(decode_path_only),
2462                     ):
2463                         yield row
2464                 else:
2465                     yield pp_console_row(
2466                         pp,
2467                         oid_maps=oid_maps,
2468                         with_offsets=True,
2469                         with_blob=True,
2470                         with_colours=with_colours,
2471                         with_decode_path=with_decode_path,
2472                         decode_path_len_decrease=len(decode_path_only),
2473                     )
2474             else:
2475                 for row in _pprint_pps(pp):
2476                     yield row
2477     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2478
2479
2480 ########################################################################
2481 # ASN.1 primitive types
2482 ########################################################################
2483
2484 BooleanState = namedtuple(
2485     "BooleanState",
2486     BasicState._fields + ("value",),
2487     **NAMEDTUPLE_KWARGS
2488 )
2489
2490
2491 class Boolean(Obj):
2492     """``BOOLEAN`` boolean type
2493
2494     >>> b = Boolean(True)
2495     BOOLEAN True
2496     >>> b == Boolean(True)
2497     True
2498     >>> bool(b)
2499     True
2500     """
2501     __slots__ = ()
2502     tag_default = tag_encode(1)
2503     asn1_type_name = "BOOLEAN"
2504
2505     def __init__(
2506             self,
2507             value=None,
2508             impl=None,
2509             expl=None,
2510             default=None,
2511             optional=False,
2512             _decoded=(0, 0, 0),
2513     ):
2514         """
2515         :param value: set the value. Either boolean type, or
2516                       :py:class:`pyderasn.Boolean` object
2517         :param bytes impl: override default tag with ``IMPLICIT`` one
2518         :param bytes expl: override default tag with ``EXPLICIT`` one
2519         :param default: set default value. Type same as in ``value``
2520         :param bool optional: is object ``OPTIONAL`` in sequence
2521         """
2522         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2523         self._value = None if value is None else self._value_sanitize(value)
2524         if default is not None:
2525             default = self._value_sanitize(default)
2526             self.default = self.__class__(
2527                 value=default,
2528                 impl=self.tag,
2529                 expl=self._expl,
2530             )
2531             if value is None:
2532                 self._value = default
2533
2534     def _value_sanitize(self, value):
2535         if value.__class__ == bool:
2536             return value
2537         if issubclass(value.__class__, Boolean):
2538             return value._value
2539         raise InvalidValueType((self.__class__, bool))
2540
2541     @property
2542     def ready(self):
2543         return self._value is not None
2544
2545     def __getstate__(self):
2546         return BooleanState(
2547             __version__,
2548             self.tag,
2549             self._tag_order,
2550             self._expl,
2551             self.default,
2552             self.optional,
2553             self.offset,
2554             self.llen,
2555             self.vlen,
2556             self.expl_lenindef,
2557             self.lenindef,
2558             self.ber_encoded,
2559             self._value,
2560         )
2561
2562     def __setstate__(self, state):
2563         super(Boolean, self).__setstate__(state)
2564         self._value = state.value
2565
2566     def __nonzero__(self):
2567         self._assert_ready()
2568         return self._value
2569
2570     def __bool__(self):
2571         self._assert_ready()
2572         return self._value
2573
2574     def __eq__(self, their):
2575         if their.__class__ == bool:
2576             return self._value == their
2577         if not issubclass(their.__class__, Boolean):
2578             return False
2579         return (
2580             self._value == their._value and
2581             self.tag == their.tag and
2582             self._expl == their._expl
2583         )
2584
2585     def __call__(
2586             self,
2587             value=None,
2588             impl=None,
2589             expl=None,
2590             default=None,
2591             optional=None,
2592     ):
2593         return self.__class__(
2594             value=value,
2595             impl=self.tag if impl is None else impl,
2596             expl=self._expl if expl is None else expl,
2597             default=self.default if default is None else default,
2598             optional=self.optional if optional is None else optional,
2599         )
2600
2601     def _encode(self):
2602         self._assert_ready()
2603         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2604
2605     def _encode1st(self, state):
2606         return len(self.tag) + 2, state
2607
2608     def _encode2nd(self, writer, state_iter):
2609         self._assert_ready()
2610         write_full(writer, self._encode())
2611
2612     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2613         try:
2614             t, _, lv = tag_strip(tlv)
2615         except DecodeError as err:
2616             raise err.__class__(
2617                 msg=err.msg,
2618                 klass=self.__class__,
2619                 decode_path=decode_path,
2620                 offset=offset,
2621             )
2622         if t != self.tag:
2623             raise TagMismatch(
2624                 klass=self.__class__,
2625                 decode_path=decode_path,
2626                 offset=offset,
2627             )
2628         if tag_only:
2629             yield None
2630             return
2631         try:
2632             l, _, v = len_decode(lv)
2633         except DecodeError as err:
2634             raise err.__class__(
2635                 msg=err.msg,
2636                 klass=self.__class__,
2637                 decode_path=decode_path,
2638                 offset=offset,
2639             )
2640         if l != 1:
2641             raise InvalidLength(
2642                 "Boolean's length must be equal to 1",
2643                 klass=self.__class__,
2644                 decode_path=decode_path,
2645                 offset=offset,
2646             )
2647         if l > len(v):
2648             raise NotEnoughData(
2649                 "encoded length is longer than data",
2650                 klass=self.__class__,
2651                 decode_path=decode_path,
2652                 offset=offset,
2653             )
2654         first_octet = byte2int(v)
2655         ber_encoded = False
2656         if first_octet == 0:
2657             value = False
2658         elif first_octet == 0xFF:
2659             value = True
2660         elif ctx.get("bered", False):
2661             value = True
2662             ber_encoded = True
2663         else:
2664             raise DecodeError(
2665                 "unacceptable Boolean value",
2666                 klass=self.__class__,
2667                 decode_path=decode_path,
2668                 offset=offset,
2669             )
2670         obj = self.__class__(
2671             value=value,
2672             impl=self.tag,
2673             expl=self._expl,
2674             default=self.default,
2675             optional=self.optional,
2676             _decoded=(offset, 1, 1),
2677         )
2678         obj.ber_encoded = ber_encoded
2679         yield decode_path, obj, v[1:]
2680
2681     def __repr__(self):
2682         return pp_console_row(next(self.pps()))
2683
2684     def pps(self, decode_path=()):
2685         yield _pp(
2686             obj=self,
2687             asn1_type_name=self.asn1_type_name,
2688             obj_name=self.__class__.__name__,
2689             decode_path=decode_path,
2690             value=str(self._value) if self.ready else None,
2691             optional=self.optional,
2692             default=self == self.default,
2693             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2694             expl=None if self._expl is None else tag_decode(self._expl),
2695             offset=self.offset,
2696             tlen=self.tlen,
2697             llen=self.llen,
2698             vlen=self.vlen,
2699             expl_offset=self.expl_offset if self.expled else None,
2700             expl_tlen=self.expl_tlen if self.expled else None,
2701             expl_llen=self.expl_llen if self.expled else None,
2702             expl_vlen=self.expl_vlen if self.expled else None,
2703             expl_lenindef=self.expl_lenindef,
2704             ber_encoded=self.ber_encoded,
2705             bered=self.bered,
2706         )
2707         for pp in self.pps_lenindef(decode_path):
2708             yield pp
2709
2710
2711 IntegerState = namedtuple(
2712     "IntegerState",
2713     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2714     **NAMEDTUPLE_KWARGS
2715 )
2716
2717
2718 class Integer(Obj):
2719     """``INTEGER`` integer type
2720
2721     >>> b = Integer(-123)
2722     INTEGER -123
2723     >>> b == Integer(-123)
2724     True
2725     >>> int(b)
2726     -123
2727
2728     >>> Integer(2, bounds=(1, 3))
2729     INTEGER 2
2730     >>> Integer(5, bounds=(1, 3))
2731     Traceback (most recent call last):
2732     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2733
2734     ::
2735
2736         class Version(Integer):
2737             schema = (
2738                 ("v1", 0),
2739                 ("v2", 1),
2740                 ("v3", 2),
2741             )
2742
2743     >>> v = Version("v1")
2744     Version INTEGER v1
2745     >>> int(v)
2746     0
2747     >>> v.named
2748     'v1'
2749     >>> v.specs
2750     {'v3': 2, 'v1': 0, 'v2': 1}
2751     """
2752     __slots__ = ("specs", "_bound_min", "_bound_max")
2753     tag_default = tag_encode(2)
2754     asn1_type_name = "INTEGER"
2755
2756     def __init__(
2757             self,
2758             value=None,
2759             bounds=None,
2760             impl=None,
2761             expl=None,
2762             default=None,
2763             optional=False,
2764             _specs=None,
2765             _decoded=(0, 0, 0),
2766     ):
2767         """
2768         :param value: set the value. Either integer type, named value
2769                       (if ``schema`` is specified in the class), or
2770                       :py:class:`pyderasn.Integer` object
2771         :param bounds: set ``(MIN, MAX)`` value constraint.
2772                        (-inf, +inf) by default
2773         :param bytes impl: override default tag with ``IMPLICIT`` one
2774         :param bytes expl: override default tag with ``EXPLICIT`` one
2775         :param default: set default value. Type same as in ``value``
2776         :param bool optional: is object ``OPTIONAL`` in sequence
2777         """
2778         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2779         self._value = value
2780         specs = getattr(self, "schema", {}) if _specs is None else _specs
2781         self.specs = specs if specs.__class__ == dict else dict(specs)
2782         self._bound_min, self._bound_max = getattr(
2783             self,
2784             "bounds",
2785             (float("-inf"), float("+inf")),
2786         ) if bounds is None else bounds
2787         if value is not None:
2788             self._value = self._value_sanitize(value)
2789         if default is not None:
2790             default = self._value_sanitize(default)
2791             self.default = self.__class__(
2792                 value=default,
2793                 impl=self.tag,
2794                 expl=self._expl,
2795                 _specs=self.specs,
2796             )
2797             if self._value is None:
2798                 self._value = default
2799
2800     def _value_sanitize(self, value):
2801         if isinstance(value, integer_types):
2802             pass
2803         elif issubclass(value.__class__, Integer):
2804             value = value._value
2805         elif value.__class__ == str:
2806             value = self.specs.get(value)
2807             if value is None:
2808                 raise ObjUnknown("integer value: %s" % value)
2809         else:
2810             raise InvalidValueType((self.__class__, int, str))
2811         if not self._bound_min <= value <= self._bound_max:
2812             raise BoundsError(self._bound_min, value, self._bound_max)
2813         return value
2814
2815     @property
2816     def ready(self):
2817         return self._value is not None
2818
2819     def __getstate__(self):
2820         return IntegerState(
2821             __version__,
2822             self.tag,
2823             self._tag_order,
2824             self._expl,
2825             self.default,
2826             self.optional,
2827             self.offset,
2828             self.llen,
2829             self.vlen,
2830             self.expl_lenindef,
2831             self.lenindef,
2832             self.ber_encoded,
2833             self.specs,
2834             self._value,
2835             self._bound_min,
2836             self._bound_max,
2837         )
2838
2839     def __setstate__(self, state):
2840         super(Integer, self).__setstate__(state)
2841         self.specs = state.specs
2842         self._value = state.value
2843         self._bound_min = state.bound_min
2844         self._bound_max = state.bound_max
2845
2846     def __int__(self):
2847         self._assert_ready()
2848         return int(self._value)
2849
2850     def tohex(self):
2851         """Hexadecimal representation
2852
2853         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2854         """
2855         hex_repr = hex(int(self))[2:].upper()
2856         if len(hex_repr) % 2 != 0:
2857             hex_repr = "0" + hex_repr
2858         return hex_repr
2859
2860     def __hash__(self):
2861         self._assert_ready()
2862         return hash(b"".join((
2863             self.tag,
2864             bytes(self._expl or b""),
2865             str(self._value).encode("ascii"),
2866         )))
2867
2868     def __eq__(self, their):
2869         if isinstance(their, integer_types):
2870             return self._value == their
2871         if not issubclass(their.__class__, Integer):
2872             return False
2873         return (
2874             self._value == their._value and
2875             self.tag == their.tag and
2876             self._expl == their._expl
2877         )
2878
2879     def __lt__(self, their):
2880         return self._value < their._value
2881
2882     @property
2883     def named(self):
2884         """Return named representation (if exists) of the value
2885         """
2886         for name, value in iteritems(self.specs):
2887             if value == self._value:
2888                 return name
2889         return None
2890
2891     def __call__(
2892             self,
2893             value=None,
2894             bounds=None,
2895             impl=None,
2896             expl=None,
2897             default=None,
2898             optional=None,
2899     ):
2900         return self.__class__(
2901             value=value,
2902             bounds=(
2903                 (self._bound_min, self._bound_max)
2904                 if bounds is None else bounds
2905             ),
2906             impl=self.tag if impl is None else impl,
2907             expl=self._expl if expl is None else expl,
2908             default=self.default if default is None else default,
2909             optional=self.optional if optional is None else optional,
2910             _specs=self.specs,
2911         )
2912
2913     def _encode_payload(self):
2914         self._assert_ready()
2915         value = self._value
2916         if PY2:
2917             if value == 0:
2918                 octets = bytearray([0])
2919             elif value < 0:
2920                 value = -value
2921                 value -= 1
2922                 octets = bytearray()
2923                 while value > 0:
2924                     octets.append((value & 0xFF) ^ 0xFF)
2925                     value >>= 8
2926                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2927                     octets.append(0xFF)
2928             else:
2929                 octets = bytearray()
2930                 while value > 0:
2931                     octets.append(value & 0xFF)
2932                     value >>= 8
2933                 if octets[-1] & 0x80 > 0:
2934                     octets.append(0x00)
2935             octets.reverse()
2936             octets = bytes(octets)
2937         else:
2938             bytes_len = ceil(value.bit_length() / 8) or 1
2939             while True:
2940                 try:
2941                     octets = value.to_bytes(
2942                         bytes_len,
2943                         byteorder="big",
2944                         signed=True,
2945                     )
2946                 except OverflowError:
2947                     bytes_len += 1
2948                 else:
2949                     break
2950         return octets
2951
2952     def _encode(self):
2953         octets = self._encode_payload()
2954         return b"".join((self.tag, len_encode(len(octets)), octets))
2955
2956     def _encode1st(self, state):
2957         l = len(self._encode_payload())
2958         return len(self.tag) + len_size(l) + l, state
2959
2960     def _encode2nd(self, writer, state_iter):
2961         write_full(writer, self._encode())
2962
2963     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2964         try:
2965             t, _, lv = tag_strip(tlv)
2966         except DecodeError as err:
2967             raise err.__class__(
2968                 msg=err.msg,
2969                 klass=self.__class__,
2970                 decode_path=decode_path,
2971                 offset=offset,
2972             )
2973         if t != self.tag:
2974             raise TagMismatch(
2975                 klass=self.__class__,
2976                 decode_path=decode_path,
2977                 offset=offset,
2978             )
2979         if tag_only:
2980             yield None
2981             return
2982         try:
2983             l, llen, v = len_decode(lv)
2984         except DecodeError as err:
2985             raise err.__class__(
2986                 msg=err.msg,
2987                 klass=self.__class__,
2988                 decode_path=decode_path,
2989                 offset=offset,
2990             )
2991         if l > len(v):
2992             raise NotEnoughData(
2993                 "encoded length is longer than data",
2994                 klass=self.__class__,
2995                 decode_path=decode_path,
2996                 offset=offset,
2997             )
2998         if l == 0:
2999             raise NotEnoughData(
3000                 "zero length",
3001                 klass=self.__class__,
3002                 decode_path=decode_path,
3003                 offset=offset,
3004             )
3005         v, tail = v[:l], v[l:]
3006         first_octet = byte2int(v)
3007         if l > 1:
3008             second_octet = byte2int(v[1:])
3009             if (
3010                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3011                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3012             ):
3013                 raise DecodeError(
3014                     "non normalized integer",
3015                     klass=self.__class__,
3016                     decode_path=decode_path,
3017                     offset=offset,
3018                 )
3019         if PY2:
3020             value = 0
3021             if first_octet & 0x80 > 0:
3022                 octets = bytearray()
3023                 for octet in bytearray(v):
3024                     octets.append(octet ^ 0xFF)
3025                 for octet in octets:
3026                     value = (value << 8) | octet
3027                 value += 1
3028                 value = -value
3029             else:
3030                 for octet in bytearray(v):
3031                     value = (value << 8) | octet
3032         else:
3033             value = int.from_bytes(v, byteorder="big", signed=True)
3034         try:
3035             obj = self.__class__(
3036                 value=value,
3037                 bounds=(self._bound_min, self._bound_max),
3038                 impl=self.tag,
3039                 expl=self._expl,
3040                 default=self.default,
3041                 optional=self.optional,
3042                 _specs=self.specs,
3043                 _decoded=(offset, llen, l),
3044             )
3045         except BoundsError as err:
3046             raise DecodeError(
3047                 msg=str(err),
3048                 klass=self.__class__,
3049                 decode_path=decode_path,
3050                 offset=offset,
3051             )
3052         yield decode_path, obj, tail
3053
3054     def __repr__(self):
3055         return pp_console_row(next(self.pps()))
3056
3057     def pps(self, decode_path=()):
3058         yield _pp(
3059             obj=self,
3060             asn1_type_name=self.asn1_type_name,
3061             obj_name=self.__class__.__name__,
3062             decode_path=decode_path,
3063             value=(self.named or str(self._value)) if self.ready else None,
3064             optional=self.optional,
3065             default=self == self.default,
3066             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3067             expl=None if self._expl is None else tag_decode(self._expl),
3068             offset=self.offset,
3069             tlen=self.tlen,
3070             llen=self.llen,
3071             vlen=self.vlen,
3072             expl_offset=self.expl_offset if self.expled else None,
3073             expl_tlen=self.expl_tlen if self.expled else None,
3074             expl_llen=self.expl_llen if self.expled else None,
3075             expl_vlen=self.expl_vlen if self.expled else None,
3076             expl_lenindef=self.expl_lenindef,
3077             bered=self.bered,
3078         )
3079         for pp in self.pps_lenindef(decode_path):
3080             yield pp
3081
3082
3083 BitStringState = namedtuple(
3084     "BitStringState",
3085     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3086     **NAMEDTUPLE_KWARGS
3087 )
3088
3089
3090 class BitString(Obj):
3091     """``BIT STRING`` bit string type
3092
3093     >>> BitString(b"hello world")
3094     BIT STRING 88 bits 68656c6c6f20776f726c64
3095     >>> bytes(b)
3096     b'hello world'
3097     >>> b == b"hello world"
3098     True
3099     >>> b.bit_len
3100     88
3101
3102     >>> BitString("'0A3B5F291CD'H")
3103     BIT STRING 44 bits 0a3b5f291cd0
3104     >>> b = BitString("'010110000000'B")
3105     BIT STRING 12 bits 5800
3106     >>> b.bit_len
3107     12
3108     >>> b[0], b[1], b[2], b[3]
3109     (False, True, False, True)
3110     >>> b[1000]
3111     False
3112     >>> [v for v in b]
3113     [False, True, False, True, True, False, False, False, False, False, False, False]
3114
3115     ::
3116
3117         class KeyUsage(BitString):
3118             schema = (
3119                 ("digitalSignature", 0),
3120                 ("nonRepudiation", 1),
3121                 ("keyEncipherment", 2),
3122             )
3123
3124     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3125     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3126     >>> b.named
3127     ['nonRepudiation', 'keyEncipherment']
3128     >>> b.specs
3129     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3130
3131     .. note::
3132
3133        Pay attention that BIT STRING can be encoded both in primitive
3134        and constructed forms. Decoder always checks constructed form tag
3135        additionally to specified primitive one. If BER decoding is
3136        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3137        of DER restrictions.
3138     """
3139     __slots__ = ("tag_constructed", "specs", "defined")
3140     tag_default = tag_encode(3)
3141     asn1_type_name = "BIT STRING"
3142
3143     def __init__(
3144             self,
3145             value=None,
3146             impl=None,
3147             expl=None,
3148             default=None,
3149             optional=False,
3150             _specs=None,
3151             _decoded=(0, 0, 0),
3152     ):
3153         """
3154         :param value: set the value. Either binary type, tuple of named
3155                       values (if ``schema`` is specified in the class),
3156                       string in ``'XXX...'B`` form, or
3157                       :py:class:`pyderasn.BitString` object
3158         :param bytes impl: override default tag with ``IMPLICIT`` one
3159         :param bytes expl: override default tag with ``EXPLICIT`` one
3160         :param default: set default value. Type same as in ``value``
3161         :param bool optional: is object ``OPTIONAL`` in sequence
3162         """
3163         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3164         specs = getattr(self, "schema", {}) if _specs is None else _specs
3165         self.specs = specs if specs.__class__ == dict else dict(specs)
3166         self._value = None if value is None else self._value_sanitize(value)
3167         if default is not None:
3168             default = self._value_sanitize(default)
3169             self.default = self.__class__(
3170                 value=default,
3171                 impl=self.tag,
3172                 expl=self._expl,
3173             )
3174             if value is None:
3175                 self._value = default
3176         self.defined = None
3177         tag_klass, _, tag_num = tag_decode(self.tag)
3178         self.tag_constructed = tag_encode(
3179             klass=tag_klass,
3180             form=TagFormConstructed,
3181             num=tag_num,
3182         )
3183
3184     def _bits2octets(self, bits):
3185         if len(self.specs) > 0:
3186             bits = bits.rstrip("0")
3187         bit_len = len(bits)
3188         bits += "0" * ((8 - (bit_len % 8)) % 8)
3189         octets = bytearray(len(bits) // 8)
3190         for i in six_xrange(len(octets)):
3191             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3192         return bit_len, bytes(octets)
3193
3194     def _value_sanitize(self, value):
3195         if isinstance(value, (string_types, binary_type)):
3196             if (
3197                     isinstance(value, string_types) and
3198                     value.startswith("'")
3199             ):
3200                 if value.endswith("'B"):
3201                     value = value[1:-2]
3202                     if not frozenset(value) <= SET01:
3203                         raise ValueError("B's coding contains unacceptable chars")
3204                     return self._bits2octets(value)
3205                 if value.endswith("'H"):
3206                     value = value[1:-2]
3207                     return (
3208                         len(value) * 4,
3209                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3210                     )
3211             if value.__class__ == binary_type:
3212                 return (len(value) * 8, value)
3213             raise InvalidValueType((self.__class__, string_types, binary_type))
3214         if value.__class__ == tuple:
3215             if (
3216                     len(value) == 2 and
3217                     isinstance(value[0], integer_types) and
3218                     value[1].__class__ == binary_type
3219             ):
3220                 return value
3221             bits = []
3222             for name in value:
3223                 bit = self.specs.get(name)
3224                 if bit is None:
3225                     raise ObjUnknown("BitString value: %s" % name)
3226                 bits.append(bit)
3227             if len(bits) == 0:
3228                 return self._bits2octets("")
3229             bits = frozenset(bits)
3230             return self._bits2octets("".join(
3231                 ("1" if bit in bits else "0")
3232                 for bit in six_xrange(max(bits) + 1)
3233             ))
3234         if issubclass(value.__class__, BitString):
3235             return value._value
3236         raise InvalidValueType((self.__class__, binary_type, string_types))
3237
3238     @property
3239     def ready(self):
3240         return self._value is not None
3241
3242     def __getstate__(self):
3243         return BitStringState(
3244             __version__,
3245             self.tag,
3246             self._tag_order,
3247             self._expl,
3248             self.default,
3249             self.optional,
3250             self.offset,
3251             self.llen,
3252             self.vlen,
3253             self.expl_lenindef,
3254             self.lenindef,
3255             self.ber_encoded,
3256             self.specs,
3257             self._value,
3258             self.tag_constructed,
3259             self.defined,
3260         )
3261
3262     def __setstate__(self, state):
3263         super(BitString, self).__setstate__(state)
3264         self.specs = state.specs
3265         self._value = state.value
3266         self.tag_constructed = state.tag_constructed
3267         self.defined = state.defined
3268
3269     def __iter__(self):
3270         self._assert_ready()
3271         for i in six_xrange(self._value[0]):
3272             yield self[i]
3273
3274     @property
3275     def bit_len(self):
3276         """Returns number of bits in the string
3277         """
3278         self._assert_ready()
3279         return self._value[0]
3280
3281     def __bytes__(self):
3282         self._assert_ready()
3283         return self._value[1]
3284
3285     def __eq__(self, their):
3286         if their.__class__ == bytes:
3287             return self._value[1] == their
3288         if not issubclass(their.__class__, BitString):
3289             return False
3290         return (
3291             self._value == their._value and
3292             self.tag == their.tag and
3293             self._expl == their._expl
3294         )
3295
3296     @property
3297     def named(self):
3298         """Named representation (if exists) of the bits
3299
3300         :returns: [str(name), ...]
3301         """
3302         return [name for name, bit in iteritems(self.specs) if self[bit]]
3303
3304     def __call__(
3305             self,
3306             value=None,
3307             impl=None,
3308             expl=None,
3309             default=None,
3310             optional=None,
3311     ):
3312         return self.__class__(
3313             value=value,
3314             impl=self.tag if impl is None else impl,
3315             expl=self._expl if expl is None else expl,
3316             default=self.default if default is None else default,
3317             optional=self.optional if optional is None else optional,
3318             _specs=self.specs,
3319         )
3320
3321     def __getitem__(self, key):
3322         if key.__class__ == int:
3323             bit_len, octets = self._value
3324             if key >= bit_len:
3325                 return False
3326             return (
3327                 byte2int(memoryview(octets)[key // 8:]) >>
3328                 (7 - (key % 8))
3329             ) & 1 == 1
3330         if isinstance(key, string_types):
3331             value = self.specs.get(key)
3332             if value is None:
3333                 raise ObjUnknown("BitString value: %s" % key)
3334             return self[value]
3335         raise InvalidValueType((int, str))
3336
3337     def _encode(self):
3338         self._assert_ready()
3339         bit_len, octets = self._value
3340         return b"".join((
3341             self.tag,
3342             len_encode(len(octets) + 1),
3343             int2byte((8 - bit_len % 8) % 8),
3344             octets,
3345         ))
3346
3347     def _encode1st(self, state):
3348         self._assert_ready()
3349         _, octets = self._value
3350         l = len(octets) + 1
3351         return len(self.tag) + len_size(l) + l, state
3352
3353     def _encode2nd(self, writer, state_iter):
3354         bit_len, octets = self._value
3355         write_full(writer, b"".join((
3356             self.tag,
3357             len_encode(len(octets) + 1),
3358             int2byte((8 - bit_len % 8) % 8),
3359         )))
3360         write_full(writer, octets)
3361
3362     def _encode_cer(self, writer):
3363         bit_len, octets = self._value
3364         if len(octets) + 1 <= 1000:
3365             write_full(writer, self._encode())
3366             return
3367         write_full(writer, self.tag_constructed)
3368         write_full(writer, LENINDEF)
3369         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3370             write_full(writer, b"".join((
3371                 BitString.tag_default,
3372                 LEN1K,
3373                 int2byte(0),
3374                 octets[offset:offset + 999],
3375             )))
3376         tail = octets[offset+999:]
3377         if len(tail) > 0:
3378             tail = int2byte((8 - bit_len % 8) % 8) + tail
3379             write_full(writer, b"".join((
3380                 BitString.tag_default,
3381                 len_encode(len(tail)),
3382                 tail,
3383             )))
3384         write_full(writer, EOC)
3385
3386     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3387         try:
3388             t, tlen, lv = tag_strip(tlv)
3389         except DecodeError as err:
3390             raise err.__class__(
3391                 msg=err.msg,
3392                 klass=self.__class__,
3393                 decode_path=decode_path,
3394                 offset=offset,
3395             )
3396         if t == self.tag:
3397             if tag_only:  # pragma: no cover
3398                 yield None
3399                 return
3400             try:
3401                 l, llen, v = len_decode(lv)
3402             except DecodeError as err:
3403                 raise err.__class__(
3404                     msg=err.msg,
3405                     klass=self.__class__,
3406                     decode_path=decode_path,
3407                     offset=offset,
3408                 )
3409             if l > len(v):
3410                 raise NotEnoughData(
3411                     "encoded length is longer than data",
3412                     klass=self.__class__,
3413                     decode_path=decode_path,
3414                     offset=offset,
3415                 )
3416             if l == 0:
3417                 raise NotEnoughData(
3418                     "zero length",
3419                     klass=self.__class__,
3420                     decode_path=decode_path,
3421                     offset=offset,
3422                 )
3423             pad_size = byte2int(v)
3424             if l == 1 and pad_size != 0:
3425                 raise DecodeError(
3426                     "invalid empty value",
3427                     klass=self.__class__,
3428                     decode_path=decode_path,
3429                     offset=offset,
3430                 )
3431             if pad_size > 7:
3432                 raise DecodeError(
3433                     "too big pad",
3434                     klass=self.__class__,
3435                     decode_path=decode_path,
3436                     offset=offset,
3437                 )
3438             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3439                 raise DecodeError(
3440                     "invalid pad",
3441                     klass=self.__class__,
3442                     decode_path=decode_path,
3443                     offset=offset,
3444                 )
3445             v, tail = v[:l], v[l:]
3446             bit_len = (len(v) - 1) * 8 - pad_size
3447             obj = self.__class__(
3448                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3449                 impl=self.tag,
3450                 expl=self._expl,
3451                 default=self.default,
3452                 optional=self.optional,
3453                 _specs=self.specs,
3454                 _decoded=(offset, llen, l),
3455             )
3456             if evgen_mode:
3457                 obj._value = (bit_len, None)
3458             yield decode_path, obj, tail
3459             return
3460         if t != self.tag_constructed:
3461             raise TagMismatch(
3462                 klass=self.__class__,
3463                 decode_path=decode_path,
3464                 offset=offset,
3465             )
3466         if not ctx.get("bered", False):
3467             raise DecodeError(
3468                 "unallowed BER constructed encoding",
3469                 klass=self.__class__,
3470                 decode_path=decode_path,
3471                 offset=offset,
3472             )
3473         if tag_only:  # pragma: no cover
3474             yield None
3475             return
3476         lenindef = False
3477         try:
3478             l, llen, v = len_decode(lv)
3479         except LenIndefForm:
3480             llen, l, v = 1, 0, lv[1:]
3481             lenindef = True
3482         except DecodeError as err:
3483             raise err.__class__(
3484                 msg=err.msg,
3485                 klass=self.__class__,
3486                 decode_path=decode_path,
3487                 offset=offset,
3488             )
3489         if l > len(v):
3490             raise NotEnoughData(
3491                 "encoded length is longer than data",
3492                 klass=self.__class__,
3493                 decode_path=decode_path,
3494                 offset=offset,
3495             )
3496         if not lenindef and l == 0:
3497             raise NotEnoughData(
3498                 "zero length",
3499                 klass=self.__class__,
3500                 decode_path=decode_path,
3501                 offset=offset,
3502             )
3503         chunks = []
3504         sub_offset = offset + tlen + llen
3505         vlen = 0
3506         while True:
3507             if lenindef:
3508                 if v[:EOC_LEN].tobytes() == EOC:
3509                     break
3510             else:
3511                 if vlen == l:
3512                     break
3513                 if vlen > l:
3514                     raise DecodeError(
3515                         "chunk out of bounds",
3516                         klass=self.__class__,
3517                         decode_path=decode_path + (str(len(chunks) - 1),),
3518                         offset=chunks[-1].offset,
3519                     )
3520             sub_decode_path = decode_path + (str(len(chunks)),)
3521             try:
3522                 if evgen_mode:
3523                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3524                             v,
3525                             offset=sub_offset,
3526                             decode_path=sub_decode_path,
3527                             leavemm=True,
3528                             ctx=ctx,
3529                             _ctx_immutable=False,
3530                     ):
3531                         yield _decode_path, chunk, v_tail
3532                 else:
3533                     _, chunk, v_tail = next(BitString().decode_evgen(
3534                         v,
3535                         offset=sub_offset,
3536                         decode_path=sub_decode_path,
3537                         leavemm=True,
3538                         ctx=ctx,
3539                         _ctx_immutable=False,
3540                         _evgen_mode=False,
3541                     ))
3542             except TagMismatch:
3543                 raise DecodeError(
3544                     "expected BitString encoded chunk",
3545                     klass=self.__class__,
3546                     decode_path=sub_decode_path,
3547                     offset=sub_offset,
3548                 )
3549             chunks.append(chunk)
3550             sub_offset += chunk.tlvlen
3551             vlen += chunk.tlvlen
3552             v = v_tail
3553         if len(chunks) == 0:
3554             raise DecodeError(
3555                 "no chunks",
3556                 klass=self.__class__,
3557                 decode_path=decode_path,
3558                 offset=offset,
3559             )
3560         values = []
3561         bit_len = 0
3562         for chunk_i, chunk in enumerate(chunks[:-1]):
3563             if chunk.bit_len % 8 != 0:
3564                 raise DecodeError(
3565                     "BitString chunk is not multiple of 8 bits",
3566                     klass=self.__class__,
3567                     decode_path=decode_path + (str(chunk_i),),
3568                     offset=chunk.offset,
3569                 )
3570             if not evgen_mode:
3571                 values.append(bytes(chunk))
3572             bit_len += chunk.bit_len
3573         chunk_last = chunks[-1]
3574         if not evgen_mode:
3575             values.append(bytes(chunk_last))
3576         bit_len += chunk_last.bit_len
3577         obj = self.__class__(
3578             value=None if evgen_mode else (bit_len, b"".join(values)),
3579             impl=self.tag,
3580             expl=self._expl,
3581             default=self.default,
3582             optional=self.optional,
3583             _specs=self.specs,
3584             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3585         )
3586         if evgen_mode:
3587             obj._value = (bit_len, None)
3588         obj.lenindef = lenindef
3589         obj.ber_encoded = True
3590         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3591
3592     def __repr__(self):
3593         return pp_console_row(next(self.pps()))
3594
3595     def pps(self, decode_path=()):
3596         value = None
3597         blob = None
3598         if self.ready:
3599             bit_len, blob = self._value
3600             value = "%d bits" % bit_len
3601             if len(self.specs) > 0 and blob is not None:
3602                 blob = tuple(self.named)
3603         yield _pp(
3604             obj=self,
3605             asn1_type_name=self.asn1_type_name,
3606             obj_name=self.__class__.__name__,
3607             decode_path=decode_path,
3608             value=value,
3609             blob=blob,
3610             optional=self.optional,
3611             default=self == self.default,
3612             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3613             expl=None if self._expl is None else tag_decode(self._expl),
3614             offset=self.offset,
3615             tlen=self.tlen,
3616             llen=self.llen,
3617             vlen=self.vlen,
3618             expl_offset=self.expl_offset if self.expled else None,
3619             expl_tlen=self.expl_tlen if self.expled else None,
3620             expl_llen=self.expl_llen if self.expled else None,
3621             expl_vlen=self.expl_vlen if self.expled else None,
3622             expl_lenindef=self.expl_lenindef,
3623             lenindef=self.lenindef,
3624             ber_encoded=self.ber_encoded,
3625             bered=self.bered,
3626         )
3627         defined_by, defined = self.defined or (None, None)
3628         if defined_by is not None:
3629             yield defined.pps(
3630                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3631             )
3632         for pp in self.pps_lenindef(decode_path):
3633             yield pp
3634
3635
3636 OctetStringState = namedtuple(
3637     "OctetStringState",
3638     BasicState._fields + (
3639         "value",
3640         "bound_min",
3641         "bound_max",
3642         "tag_constructed",
3643         "defined",
3644     ),
3645     **NAMEDTUPLE_KWARGS
3646 )
3647
3648
3649 class OctetString(Obj):
3650     """``OCTET STRING`` binary string type
3651
3652     >>> s = OctetString(b"hello world")
3653     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3654     >>> s == OctetString(b"hello world")
3655     True
3656     >>> bytes(s)
3657     b'hello world'
3658
3659     >>> OctetString(b"hello", bounds=(4, 4))
3660     Traceback (most recent call last):
3661     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3662     >>> OctetString(b"hell", bounds=(4, 4))
3663     OCTET STRING 4 bytes 68656c6c
3664
3665     Memoryviews can be used as a values. If memoryview is made on
3666     mmap-ed file, then it does not take storage inside OctetString
3667     itself. In CER encoding mode it will be streamed to the specified
3668     writer, copying 1 KB chunks.
3669     """
3670     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3671     tag_default = tag_encode(4)
3672     asn1_type_name = "OCTET STRING"
3673     evgen_mode_skip_value = True
3674
3675     def __init__(
3676             self,
3677             value=None,
3678             bounds=None,
3679             impl=None,
3680             expl=None,
3681             default=None,
3682             optional=False,
3683             _decoded=(0, 0, 0),
3684             ctx=None,
3685     ):
3686         """
3687         :param value: set the value. Either binary type, or
3688                       :py:class:`pyderasn.OctetString` object
3689         :param bounds: set ``(MIN, MAX)`` value size constraint.
3690                        (-inf, +inf) by default
3691         :param bytes impl: override default tag with ``IMPLICIT`` one
3692         :param bytes expl: override default tag with ``EXPLICIT`` one
3693         :param default: set default value. Type same as in ``value``
3694         :param bool optional: is object ``OPTIONAL`` in sequence
3695         """
3696         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3697         self._value = value
3698         self._bound_min, self._bound_max = getattr(
3699             self,
3700             "bounds",
3701             (0, float("+inf")),
3702         ) if bounds is None else bounds
3703         if value is not None:
3704             self._value = self._value_sanitize(value)
3705         if default is not None:
3706             default = self._value_sanitize(default)
3707             self.default = self.__class__(
3708                 value=default,
3709                 impl=self.tag,
3710                 expl=self._expl,
3711             )
3712             if self._value is None:
3713                 self._value = default
3714         self.defined = None
3715         tag_klass, _, tag_num = tag_decode(self.tag)
3716         self.tag_constructed = tag_encode(
3717             klass=tag_klass,
3718             form=TagFormConstructed,
3719             num=tag_num,
3720         )
3721
3722     def _value_sanitize(self, value):
3723         if value.__class__ == binary_type or value.__class__ == memoryview:
3724             pass
3725         elif issubclass(value.__class__, OctetString):
3726             value = value._value
3727         else:
3728             raise InvalidValueType((self.__class__, bytes, memoryview))
3729         if not self._bound_min <= len(value) <= self._bound_max:
3730             raise BoundsError(self._bound_min, len(value), self._bound_max)
3731         return value
3732
3733     @property
3734     def ready(self):
3735         return self._value is not None
3736
3737     def __getstate__(self):
3738         return OctetStringState(
3739             __version__,
3740             self.tag,
3741             self._tag_order,
3742             self._expl,
3743             self.default,
3744             self.optional,
3745             self.offset,
3746             self.llen,
3747             self.vlen,
3748             self.expl_lenindef,
3749             self.lenindef,
3750             self.ber_encoded,
3751             self._value,
3752             self._bound_min,
3753             self._bound_max,
3754             self.tag_constructed,
3755             self.defined,
3756         )
3757
3758     def __setstate__(self, state):
3759         super(OctetString, self).__setstate__(state)
3760         self._value = state.value
3761         self._bound_min = state.bound_min
3762         self._bound_max = state.bound_max
3763         self.tag_constructed = state.tag_constructed
3764         self.defined = state.defined
3765
3766     def __bytes__(self):
3767         self._assert_ready()
3768         return bytes(self._value)
3769
3770     def __eq__(self, their):
3771         if their.__class__ == binary_type:
3772             return self._value == their
3773         if not issubclass(their.__class__, OctetString):
3774             return False
3775         return (
3776             self._value == their._value and
3777             self.tag == their.tag and
3778             self._expl == their._expl
3779         )
3780
3781     def __lt__(self, their):
3782         return self._value < their._value
3783
3784     def __call__(
3785             self,
3786             value=None,
3787             bounds=None,
3788             impl=None,
3789             expl=None,
3790             default=None,
3791             optional=None,
3792     ):
3793         return self.__class__(
3794             value=value,
3795             bounds=(
3796                 (self._bound_min, self._bound_max)
3797                 if bounds is None else bounds
3798             ),
3799             impl=self.tag if impl is None else impl,
3800             expl=self._expl if expl is None else expl,
3801             default=self.default if default is None else default,
3802             optional=self.optional if optional is None else optional,
3803         )
3804
3805     def _encode(self):
3806         self._assert_ready()
3807         return b"".join((
3808             self.tag,
3809             len_encode(len(self._value)),
3810             self._value,
3811         ))
3812
3813     def _encode1st(self, state):
3814         self._assert_ready()
3815         l = len(self._value)
3816         return len(self.tag) + len_size(l) + l, state
3817
3818     def _encode2nd(self, writer, state_iter):
3819         value = self._value
3820         write_full(writer, self.tag + len_encode(len(value)))
3821         write_full(writer, value)
3822
3823     def _encode_cer(self, writer):
3824         octets = self._value
3825         if len(octets) <= 1000:
3826             write_full(writer, self._encode())
3827             return
3828         write_full(writer, self.tag_constructed)
3829         write_full(writer, LENINDEF)
3830         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3831             write_full(writer, b"".join((
3832                 OctetString.tag_default,
3833                 LEN1K,
3834                 octets[offset:offset + 1000],
3835             )))
3836         tail = octets[offset+1000:]
3837         if len(tail) > 0:
3838             write_full(writer, b"".join((
3839                 OctetString.tag_default,
3840                 len_encode(len(tail)),
3841                 tail,
3842             )))
3843         write_full(writer, EOC)
3844
3845     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3846         try:
3847             t, tlen, lv = tag_strip(tlv)
3848         except DecodeError as err:
3849             raise err.__class__(
3850                 msg=err.msg,
3851                 klass=self.__class__,
3852                 decode_path=decode_path,
3853                 offset=offset,
3854             )
3855         if t == self.tag:
3856             if tag_only:
3857                 yield None
3858                 return
3859             try:
3860                 l, llen, v = len_decode(lv)
3861             except DecodeError as err:
3862                 raise err.__class__(
3863                     msg=err.msg,
3864                     klass=self.__class__,
3865                     decode_path=decode_path,
3866                     offset=offset,
3867                 )
3868             if l > len(v):
3869                 raise NotEnoughData(
3870                     "encoded length is longer than data",
3871                     klass=self.__class__,
3872                     decode_path=decode_path,
3873                     offset=offset,
3874                 )
3875             v, tail = v[:l], v[l:]
3876             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3877                 raise DecodeError(
3878                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3879                     klass=self.__class__,
3880                     decode_path=decode_path,
3881                     offset=offset,
3882                 )
3883             try:
3884                 obj = self.__class__(
3885                     value=(
3886                         None if (evgen_mode and self.evgen_mode_skip_value)
3887                         else v.tobytes()
3888                     ),
3889                     bounds=(self._bound_min, self._bound_max),
3890                     impl=self.tag,
3891                     expl=self._expl,
3892                     default=self.default,
3893                     optional=self.optional,
3894                     _decoded=(offset, llen, l),
3895                     ctx=ctx,
3896                 )
3897             except DecodeError as err:
3898                 raise DecodeError(
3899                     msg=err.msg,
3900                     klass=self.__class__,
3901                     decode_path=decode_path,
3902                     offset=offset,
3903                 )
3904             except BoundsError as err:
3905                 raise DecodeError(
3906                     msg=str(err),
3907                     klass=self.__class__,
3908                     decode_path=decode_path,
3909                     offset=offset,
3910                 )
3911             yield decode_path, obj, tail
3912             return
3913         if t != self.tag_constructed:
3914             raise TagMismatch(
3915                 klass=self.__class__,
3916                 decode_path=decode_path,
3917                 offset=offset,
3918             )
3919         if not ctx.get("bered", False):
3920             raise DecodeError(
3921                 "unallowed BER constructed encoding",
3922                 klass=self.__class__,
3923                 decode_path=decode_path,
3924                 offset=offset,
3925             )
3926         if tag_only:
3927             yield None
3928             return
3929         lenindef = False
3930         try:
3931             l, llen, v = len_decode(lv)
3932         except LenIndefForm:
3933             llen, l, v = 1, 0, lv[1:]
3934             lenindef = True
3935         except DecodeError as err:
3936             raise err.__class__(
3937                 msg=err.msg,
3938                 klass=self.__class__,
3939                 decode_path=decode_path,
3940                 offset=offset,
3941             )
3942         if l > len(v):
3943             raise NotEnoughData(
3944                 "encoded length is longer than data",
3945                 klass=self.__class__,
3946                 decode_path=decode_path,
3947                 offset=offset,
3948             )
3949         chunks = []
3950         chunks_count = 0
3951         sub_offset = offset + tlen + llen
3952         vlen = 0
3953         payload_len = 0
3954         while True:
3955             if lenindef:
3956                 if v[:EOC_LEN].tobytes() == EOC:
3957                     break
3958             else:
3959                 if vlen == l:
3960                     break
3961                 if vlen > l:
3962                     raise DecodeError(
3963                         "chunk out of bounds",
3964                         klass=self.__class__,
3965                         decode_path=decode_path + (str(len(chunks) - 1),),
3966                         offset=chunks[-1].offset,
3967                     )
3968             try:
3969                 if evgen_mode:
3970                     sub_decode_path = decode_path + (str(chunks_count),)
3971                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3972                             v,
3973                             offset=sub_offset,
3974                             decode_path=sub_decode_path,
3975                             leavemm=True,
3976                             ctx=ctx,
3977                             _ctx_immutable=False,
3978                     ):
3979                         yield _decode_path, chunk, v_tail
3980                         if not chunk.ber_encoded:
3981                             payload_len += chunk.vlen
3982                     chunks_count += 1
3983                 else:
3984                     sub_decode_path = decode_path + (str(len(chunks)),)
3985                     _, chunk, v_tail = next(OctetString().decode_evgen(
3986                         v,
3987                         offset=sub_offset,
3988                         decode_path=sub_decode_path,
3989                         leavemm=True,
3990                         ctx=ctx,
3991                         _ctx_immutable=False,
3992                         _evgen_mode=False,
3993                     ))
3994                     chunks.append(chunk)
3995             except TagMismatch:
3996                 raise DecodeError(
3997                     "expected OctetString encoded chunk",
3998                     klass=self.__class__,
3999                     decode_path=sub_decode_path,
4000                     offset=sub_offset,
4001                 )
4002             sub_offset += chunk.tlvlen
4003             vlen += chunk.tlvlen
4004             v = v_tail
4005         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4006             raise DecodeError(
4007                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4008                 klass=self.__class__,
4009                 decode_path=decode_path,
4010                 offset=offset,
4011             )
4012         try:
4013             obj = self.__class__(
4014                 value=(
4015                     None if evgen_mode else
4016                     b"".join(bytes(chunk) for chunk in chunks)
4017                 ),
4018                 bounds=(self._bound_min, self._bound_max),
4019                 impl=self.tag,
4020                 expl=self._expl,
4021                 default=self.default,
4022                 optional=self.optional,
4023                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4024                 ctx=ctx,
4025             )
4026         except DecodeError as err:
4027             raise DecodeError(
4028                 msg=err.msg,
4029                 klass=self.__class__,
4030                 decode_path=decode_path,
4031                 offset=offset,
4032             )
4033         except BoundsError as err:
4034             raise DecodeError(
4035                 msg=str(err),
4036                 klass=self.__class__,
4037                 decode_path=decode_path,
4038                 offset=offset,
4039             )
4040         obj.lenindef = lenindef
4041         obj.ber_encoded = True
4042         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4043
4044     def __repr__(self):
4045         return pp_console_row(next(self.pps()))
4046
4047     def pps(self, decode_path=()):
4048         yield _pp(
4049             obj=self,
4050             asn1_type_name=self.asn1_type_name,
4051             obj_name=self.__class__.__name__,
4052             decode_path=decode_path,
4053             value=("%d bytes" % len(self._value)) if self.ready else None,
4054             blob=self._value if self.ready else None,
4055             optional=self.optional,
4056             default=self == self.default,
4057             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4058             expl=None if self._expl is None else tag_decode(self._expl),
4059             offset=self.offset,
4060             tlen=self.tlen,
4061             llen=self.llen,
4062             vlen=self.vlen,
4063             expl_offset=self.expl_offset if self.expled else None,
4064             expl_tlen=self.expl_tlen if self.expled else None,
4065             expl_llen=self.expl_llen if self.expled else None,
4066             expl_vlen=self.expl_vlen if self.expled else None,
4067             expl_lenindef=self.expl_lenindef,
4068             lenindef=self.lenindef,
4069             ber_encoded=self.ber_encoded,
4070             bered=self.bered,
4071         )
4072         defined_by, defined = self.defined or (None, None)
4073         if defined_by is not None:
4074             yield defined.pps(
4075                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4076             )
4077         for pp in self.pps_lenindef(decode_path):
4078             yield pp
4079
4080
4081 def agg_octet_string(evgens, decode_path, raw, writer):
4082     """Aggregate constructed string (OctetString and its derivatives)
4083
4084     :param evgens: iterator of generated events
4085     :param decode_path: points to the string we want to decode
4086     :param raw: slicebable (memoryview, bytearray, etc) with
4087                 the data evgens are generated on
4088     :param writer: buffer.write where string is going to be saved
4089     :param writer: where string is going to be saved. Must comply
4090                    with ``io.RawIOBase.write`` behaviour
4091
4092     .. seealso:: :ref:`agg_octet_string`
4093     """
4094     decode_path_len = len(decode_path)
4095     for dp, obj, _ in evgens:
4096         if dp[:decode_path_len] != decode_path:
4097             continue
4098         if not obj.ber_encoded:
4099             write_full(writer, raw[
4100                 obj.offset + obj.tlen + obj.llen:
4101                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4102                 (EOC_LEN if obj.expl_lenindef else 0)
4103             ])
4104         if len(dp) == decode_path_len:
4105             break
4106
4107
4108 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4109
4110
4111 class Null(Obj):
4112     """``NULL`` null object
4113
4114     >>> n = Null()
4115     NULL
4116     >>> n.ready
4117     True
4118     """
4119     __slots__ = ()
4120     tag_default = tag_encode(5)
4121     asn1_type_name = "NULL"
4122
4123     def __init__(
4124             self,
4125             value=None,  # unused, but Sequence passes it
4126             impl=None,
4127             expl=None,
4128             optional=False,
4129             _decoded=(0, 0, 0),
4130     ):
4131         """
4132         :param bytes impl: override default tag with ``IMPLICIT`` one
4133         :param bytes expl: override default tag with ``EXPLICIT`` one
4134         :param bool optional: is object ``OPTIONAL`` in sequence
4135         """
4136         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4137         self.default = None
4138
4139     @property
4140     def ready(self):
4141         return True
4142
4143     def __getstate__(self):
4144         return NullState(
4145             __version__,
4146             self.tag,
4147             self._tag_order,
4148             self._expl,
4149             self.default,
4150             self.optional,
4151             self.offset,
4152             self.llen,
4153             self.vlen,
4154             self.expl_lenindef,
4155             self.lenindef,
4156             self.ber_encoded,
4157         )
4158
4159     def __eq__(self, their):
4160         if not issubclass(their.__class__, Null):
4161             return False
4162         return (
4163             self.tag == their.tag and
4164             self._expl == their._expl
4165         )
4166
4167     def __call__(
4168             self,
4169             value=None,
4170             impl=None,
4171             expl=None,
4172             optional=None,
4173     ):
4174         return self.__class__(
4175             impl=self.tag if impl is None else impl,
4176             expl=self._expl if expl is None else expl,
4177             optional=self.optional if optional is None else optional,
4178         )
4179
4180     def _encode(self):
4181         return self.tag + LEN0
4182
4183     def _encode1st(self, state):
4184         return len(self.tag) + 1, state
4185
4186     def _encode2nd(self, writer, state_iter):
4187         write_full(writer, self.tag + LEN0)
4188
4189     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4190         try:
4191             t, _, lv = tag_strip(tlv)
4192         except DecodeError as err:
4193             raise err.__class__(
4194                 msg=err.msg,
4195                 klass=self.__class__,
4196                 decode_path=decode_path,
4197                 offset=offset,
4198             )
4199         if t != self.tag:
4200             raise TagMismatch(
4201                 klass=self.__class__,
4202                 decode_path=decode_path,
4203                 offset=offset,
4204             )
4205         if tag_only:  # pragma: no cover
4206             yield None
4207             return
4208         try:
4209             l, _, v = len_decode(lv)
4210         except DecodeError as err:
4211             raise err.__class__(
4212                 msg=err.msg,
4213                 klass=self.__class__,
4214                 decode_path=decode_path,
4215                 offset=offset,
4216             )
4217         if l != 0:
4218             raise InvalidLength(
4219                 "Null must have zero length",
4220                 klass=self.__class__,
4221                 decode_path=decode_path,
4222                 offset=offset,
4223             )
4224         obj = self.__class__(
4225             impl=self.tag,
4226             expl=self._expl,
4227             optional=self.optional,
4228             _decoded=(offset, 1, 0),
4229         )
4230         yield decode_path, obj, v
4231
4232     def __repr__(self):
4233         return pp_console_row(next(self.pps()))
4234
4235     def pps(self, decode_path=()):
4236         yield _pp(
4237             obj=self,
4238             asn1_type_name=self.asn1_type_name,
4239             obj_name=self.__class__.__name__,
4240             decode_path=decode_path,
4241             optional=self.optional,
4242             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4243             expl=None if self._expl is None else tag_decode(self._expl),
4244             offset=self.offset,
4245             tlen=self.tlen,
4246             llen=self.llen,
4247             vlen=self.vlen,
4248             expl_offset=self.expl_offset if self.expled else None,
4249             expl_tlen=self.expl_tlen if self.expled else None,
4250             expl_llen=self.expl_llen if self.expled else None,
4251             expl_vlen=self.expl_vlen if self.expled else None,
4252             expl_lenindef=self.expl_lenindef,
4253             bered=self.bered,
4254         )
4255         for pp in self.pps_lenindef(decode_path):
4256             yield pp
4257
4258
4259 ObjectIdentifierState = namedtuple(
4260     "ObjectIdentifierState",
4261     BasicState._fields + ("value", "defines"),
4262     **NAMEDTUPLE_KWARGS
4263 )
4264
4265
4266 class ObjectIdentifier(Obj):
4267     """``OBJECT IDENTIFIER`` OID type
4268
4269     >>> oid = ObjectIdentifier((1, 2, 3))
4270     OBJECT IDENTIFIER 1.2.3
4271     >>> oid == ObjectIdentifier("1.2.3")
4272     True
4273     >>> tuple(oid)
4274     (1, 2, 3)
4275     >>> str(oid)
4276     '1.2.3'
4277     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4278     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4279
4280     >>> str(ObjectIdentifier((3, 1)))
4281     Traceback (most recent call last):
4282     pyderasn.InvalidOID: unacceptable first arc value
4283     """
4284     __slots__ = ("defines",)
4285     tag_default = tag_encode(6)
4286     asn1_type_name = "OBJECT IDENTIFIER"
4287
4288     def __init__(
4289             self,
4290             value=None,
4291             defines=(),
4292             impl=None,
4293             expl=None,
4294             default=None,
4295             optional=False,
4296             _decoded=(0, 0, 0),
4297     ):
4298         """
4299         :param value: set the value. Either tuples of integers,
4300                       string of "."-concatenated integers, or
4301                       :py:class:`pyderasn.ObjectIdentifier` object
4302         :param defines: sequence of tuples. Each tuple has two elements.
4303                         First one is relative to current one decode
4304                         path, aiming to the field defined by that OID.
4305                         Read about relative path in
4306                         :py:func:`pyderasn.abs_decode_path`. Second
4307                         tuple element is ``{OID: pyderasn.Obj()}``
4308                         dictionary, mapping between current OID value
4309                         and structure applied to defined field.
4310
4311                         .. seealso:: :ref:`definedby`
4312
4313         :param bytes impl: override default tag with ``IMPLICIT`` one
4314         :param bytes expl: override default tag with ``EXPLICIT`` one
4315         :param default: set default value. Type same as in ``value``
4316         :param bool optional: is object ``OPTIONAL`` in sequence
4317         """
4318         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4319         self._value = value
4320         if value is not None:
4321             self._value = self._value_sanitize(value)
4322         if default is not None:
4323             default = self._value_sanitize(default)
4324             self.default = self.__class__(
4325                 value=default,
4326                 impl=self.tag,
4327                 expl=self._expl,
4328             )
4329             if self._value is None:
4330                 self._value = default
4331         self.defines = defines
4332
4333     def __add__(self, their):
4334         if their.__class__ == tuple:
4335             return self.__class__(self._value + array("L", their))
4336         if isinstance(their, self.__class__):
4337             return self.__class__(self._value + their._value)
4338         raise InvalidValueType((self.__class__, tuple))
4339
4340     def _value_sanitize(self, value):
4341         if issubclass(value.__class__, ObjectIdentifier):
4342             return value._value
4343         if isinstance(value, string_types):
4344             try:
4345                 value = array("L", (pureint(arc) for arc in value.split(".")))
4346             except ValueError:
4347                 raise InvalidOID("unacceptable arcs values")
4348         if value.__class__ == tuple:
4349             try:
4350                 value = array("L", value)
4351             except OverflowError as err:
4352                 raise InvalidOID(repr(err))
4353         if value.__class__ is array:
4354             if len(value) < 2:
4355                 raise InvalidOID("less than 2 arcs")
4356             first_arc = value[0]
4357             if first_arc in (0, 1):
4358                 if not (0 <= value[1] <= 39):
4359                     raise InvalidOID("second arc is too wide")
4360             elif first_arc == 2:
4361                 pass
4362             else:
4363                 raise InvalidOID("unacceptable first arc value")
4364             if not all(arc >= 0 for arc in value):
4365                 raise InvalidOID("negative arc value")
4366             return value
4367         raise InvalidValueType((self.__class__, str, tuple))
4368
4369     @property
4370     def ready(self):
4371         return self._value is not None
4372
4373     def __getstate__(self):
4374         return ObjectIdentifierState(
4375             __version__,
4376             self.tag,
4377             self._tag_order,
4378             self._expl,
4379             self.default,
4380             self.optional,
4381             self.offset,
4382             self.llen,
4383             self.vlen,
4384             self.expl_lenindef,
4385             self.lenindef,
4386             self.ber_encoded,
4387             self._value,
4388             self.defines,
4389         )
4390
4391     def __setstate__(self, state):
4392         super(ObjectIdentifier, self).__setstate__(state)
4393         self._value = state.value
4394         self.defines = state.defines
4395
4396     def __iter__(self):
4397         self._assert_ready()
4398         return iter(self._value)
4399
4400     def __str__(self):
4401         return ".".join(str(arc) for arc in self._value or ())
4402
4403     def __hash__(self):
4404         self._assert_ready()
4405         return hash(b"".join((
4406             self.tag,
4407             bytes(self._expl or b""),
4408             str(self._value).encode("ascii"),
4409         )))
4410
4411     def __eq__(self, their):
4412         if their.__class__ == tuple:
4413             return self._value == array("L", their)
4414         if not issubclass(their.__class__, ObjectIdentifier):
4415             return False
4416         return (
4417             self.tag == their.tag and
4418             self._expl == their._expl and
4419             self._value == their._value
4420         )
4421
4422     def __lt__(self, their):
4423         return self._value < their._value
4424
4425     def __call__(
4426             self,
4427             value=None,
4428             defines=None,
4429             impl=None,
4430             expl=None,
4431             default=None,
4432             optional=None,
4433     ):
4434         return self.__class__(
4435             value=value,
4436             defines=self.defines if defines is None else defines,
4437             impl=self.tag if impl is None else impl,
4438             expl=self._expl if expl is None else expl,
4439             default=self.default if default is None else default,
4440             optional=self.optional if optional is None else optional,
4441         )
4442
4443     def _encode_octets(self):
4444         self._assert_ready()
4445         value = self._value
4446         first_value = value[1]
4447         first_arc = value[0]
4448         if first_arc == 0:
4449             pass
4450         elif first_arc == 1:
4451             first_value += 40
4452         elif first_arc == 2:
4453             first_value += 80
4454         else:  # pragma: no cover
4455             raise RuntimeError("invalid arc is stored")
4456         octets = [zero_ended_encode(first_value)]
4457         for arc in value[2:]:
4458             octets.append(zero_ended_encode(arc))
4459         return b"".join(octets)
4460
4461     def _encode(self):
4462         v = self._encode_octets()
4463         return b"".join((self.tag, len_encode(len(v)), v))
4464
4465     def _encode1st(self, state):
4466         l = len(self._encode_octets())
4467         return len(self.tag) + len_size(l) + l, state
4468
4469     def _encode2nd(self, writer, state_iter):
4470         write_full(writer, self._encode())
4471
4472     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4473         try:
4474             t, _, lv = tag_strip(tlv)
4475         except DecodeError as err:
4476             raise err.__class__(
4477                 msg=err.msg,
4478                 klass=self.__class__,
4479                 decode_path=decode_path,
4480                 offset=offset,
4481             )
4482         if t != self.tag:
4483             raise TagMismatch(
4484                 klass=self.__class__,
4485                 decode_path=decode_path,
4486                 offset=offset,
4487             )
4488         if tag_only:  # pragma: no cover
4489             yield None
4490             return
4491         try:
4492             l, llen, v = len_decode(lv)
4493         except DecodeError as err:
4494             raise err.__class__(
4495                 msg=err.msg,
4496                 klass=self.__class__,
4497                 decode_path=decode_path,
4498                 offset=offset,
4499             )
4500         if l > len(v):
4501             raise NotEnoughData(
4502                 "encoded length is longer than data",
4503                 klass=self.__class__,
4504                 decode_path=decode_path,
4505                 offset=offset,
4506             )
4507         if l == 0:
4508             raise NotEnoughData(
4509                 "zero length",
4510                 klass=self.__class__,
4511                 decode_path=decode_path,
4512                 offset=offset,
4513             )
4514         v, tail = v[:l], v[l:]
4515         arcs = array("L")
4516         ber_encoded = False
4517         while len(v) > 0:
4518             i = 0
4519             arc = 0
4520             while True:
4521                 octet = indexbytes(v, i)
4522                 if i == 0 and octet == 0x80:
4523                     if ctx.get("bered", False):
4524                         ber_encoded = True
4525                     else:
4526                         raise DecodeError(
4527                             "non normalized arc encoding",
4528                             klass=self.__class__,
4529                             decode_path=decode_path,
4530                             offset=offset,
4531                         )
4532                 arc = (arc << 7) | (octet & 0x7F)
4533                 if octet & 0x80 == 0:
4534                     try:
4535                         arcs.append(arc)
4536                     except OverflowError:
4537                         raise DecodeError(
4538                             "too huge value for local unsigned long",
4539                             klass=self.__class__,
4540                             decode_path=decode_path,
4541                             offset=offset,
4542                         )
4543                     v = v[i + 1:]
4544                     break
4545                 i += 1
4546                 if i == len(v):
4547                     raise DecodeError(
4548                         "unfinished OID",
4549                         klass=self.__class__,
4550                         decode_path=decode_path,
4551                         offset=offset,
4552                     )
4553         first_arc = 0
4554         second_arc = arcs[0]
4555         if 0 <= second_arc <= 39:
4556             first_arc = 0
4557         elif 40 <= second_arc <= 79:
4558             first_arc = 1
4559             second_arc -= 40
4560         else:
4561             first_arc = 2
4562             second_arc -= 80
4563         obj = self.__class__(
4564             value=array("L", (first_arc, second_arc)) + arcs[1:],
4565             impl=self.tag,
4566             expl=self._expl,
4567             default=self.default,
4568             optional=self.optional,
4569             _decoded=(offset, llen, l),
4570         )
4571         if ber_encoded:
4572             obj.ber_encoded = True
4573         yield decode_path, obj, tail
4574
4575     def __repr__(self):
4576         return pp_console_row(next(self.pps()))
4577
4578     def pps(self, decode_path=()):
4579         yield _pp(
4580             obj=self,
4581             asn1_type_name=self.asn1_type_name,
4582             obj_name=self.__class__.__name__,
4583             decode_path=decode_path,
4584             value=str(self) if self.ready else None,
4585             optional=self.optional,
4586             default=self == self.default,
4587             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4588             expl=None if self._expl is None else tag_decode(self._expl),
4589             offset=self.offset,
4590             tlen=self.tlen,
4591             llen=self.llen,
4592             vlen=self.vlen,
4593             expl_offset=self.expl_offset if self.expled else None,
4594             expl_tlen=self.expl_tlen if self.expled else None,
4595             expl_llen=self.expl_llen if self.expled else None,
4596             expl_vlen=self.expl_vlen if self.expled else None,
4597             expl_lenindef=self.expl_lenindef,
4598             ber_encoded=self.ber_encoded,
4599             bered=self.bered,
4600         )
4601         for pp in self.pps_lenindef(decode_path):
4602             yield pp
4603
4604
4605 class Enumerated(Integer):
4606     """``ENUMERATED`` integer type
4607
4608     This type is identical to :py:class:`pyderasn.Integer`, but requires
4609     schema to be specified and does not accept values missing from it.
4610     """
4611     __slots__ = ()
4612     tag_default = tag_encode(10)
4613     asn1_type_name = "ENUMERATED"
4614
4615     def __init__(
4616             self,
4617             value=None,
4618             impl=None,
4619             expl=None,
4620             default=None,
4621             optional=False,
4622             _specs=None,
4623             _decoded=(0, 0, 0),
4624             bounds=None,  # dummy argument, workability for Integer.decode
4625     ):
4626         super(Enumerated, self).__init__(
4627             value, bounds, impl, expl, default, optional, _specs, _decoded,
4628         )
4629         if len(self.specs) == 0:
4630             raise ValueError("schema must be specified")
4631
4632     def _value_sanitize(self, value):
4633         if isinstance(value, self.__class__):
4634             value = value._value
4635         elif isinstance(value, integer_types):
4636             for _value in itervalues(self.specs):
4637                 if _value == value:
4638                     break
4639             else:
4640                 raise DecodeError(
4641                     "unknown integer value: %s" % value,
4642                     klass=self.__class__,
4643                 )
4644         elif isinstance(value, string_types):
4645             value = self.specs.get(value)
4646             if value is None:
4647                 raise ObjUnknown("integer value: %s" % value)
4648         else:
4649             raise InvalidValueType((self.__class__, int, str))
4650         return value
4651
4652     def __call__(
4653             self,
4654             value=None,
4655             impl=None,
4656             expl=None,
4657             default=None,
4658             optional=None,
4659             _specs=None,
4660     ):
4661         return self.__class__(
4662             value=value,
4663             impl=self.tag if impl is None else impl,
4664             expl=self._expl if expl is None else expl,
4665             default=self.default if default is None else default,
4666             optional=self.optional if optional is None else optional,
4667             _specs=self.specs,
4668         )
4669
4670
4671 def escape_control_unicode(c):
4672     if unicat(c)[0] == "C":
4673         c = repr(c).lstrip("u").strip("'")
4674     return c
4675
4676
4677 class CommonString(OctetString):
4678     """Common class for all strings
4679
4680     Everything resembles :py:class:`pyderasn.OctetString`, except
4681     ability to deal with unicode text strings.
4682
4683     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4684     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4685     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4686     True
4687     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4688     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4689     >>> str(s)
4690     'привет Ð¼Ð¸Ñ€'
4691     >>> hexenc(bytes(s))
4692     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4693
4694     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4695     Traceback (most recent call last):
4696     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4697
4698     >>> BMPString("ада", bounds=(2, 2))
4699     Traceback (most recent call last):
4700     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4701     >>> s = BMPString("ад", bounds=(2, 2))
4702     >>> s.encoding
4703     'utf-16-be'
4704     >>> hexenc(bytes(s))
4705     '04300434'
4706
4707     .. list-table::
4708        :header-rows: 1
4709
4710        * - Class
4711          - Text Encoding
4712        * - :py:class:`pyderasn.UTF8String`
4713          - utf-8
4714        * - :py:class:`pyderasn.NumericString`
4715          - ascii
4716        * - :py:class:`pyderasn.PrintableString`
4717          - ascii
4718        * - :py:class:`pyderasn.TeletexString`
4719          - ascii
4720        * - :py:class:`pyderasn.T61String`
4721          - ascii
4722        * - :py:class:`pyderasn.VideotexString`
4723          - iso-8859-1
4724        * - :py:class:`pyderasn.IA5String`
4725          - ascii
4726        * - :py:class:`pyderasn.GraphicString`
4727          - iso-8859-1
4728        * - :py:class:`pyderasn.VisibleString`
4729          - ascii
4730        * - :py:class:`pyderasn.ISO646String`
4731          - ascii
4732        * - :py:class:`pyderasn.GeneralString`
4733          - iso-8859-1
4734        * - :py:class:`pyderasn.UniversalString`
4735          - utf-32-be
4736        * - :py:class:`pyderasn.BMPString`
4737          - utf-16-be
4738     """
4739     __slots__ = ()
4740
4741     def _value_sanitize(self, value):
4742         value_raw = None
4743         value_decoded = None
4744         if isinstance(value, self.__class__):
4745             value_raw = value._value
4746         elif value.__class__ == text_type:
4747             value_decoded = value
4748         elif value.__class__ == binary_type:
4749             value_raw = value
4750         else:
4751             raise InvalidValueType((self.__class__, text_type, binary_type))
4752         try:
4753             value_raw = (
4754                 value_decoded.encode(self.encoding)
4755                 if value_raw is None else value_raw
4756             )
4757             value_decoded = (
4758                 value_raw.decode(self.encoding)
4759                 if value_decoded is None else value_decoded
4760             )
4761         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4762             raise DecodeError(str(err))
4763         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4764             raise BoundsError(
4765                 self._bound_min,
4766                 len(value_decoded),
4767                 self._bound_max,
4768             )
4769         return value_raw
4770
4771     def __eq__(self, their):
4772         if their.__class__ == binary_type:
4773             return self._value == their
4774         if their.__class__ == text_type:
4775             return self._value == their.encode(self.encoding)
4776         if not isinstance(their, self.__class__):
4777             return False
4778         return (
4779             self._value == their._value and
4780             self.tag == their.tag and
4781             self._expl == their._expl
4782         )
4783
4784     def __unicode__(self):
4785         if self.ready:
4786             return self._value.decode(self.encoding)
4787         return text_type(self._value)
4788
4789     def __repr__(self):
4790         return pp_console_row(next(self.pps(no_unicode=PY2)))
4791
4792     def pps(self, decode_path=(), no_unicode=False):
4793         value = None
4794         if self.ready:
4795             value = (
4796                 hexenc(bytes(self)) if no_unicode else
4797                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4798             )
4799         yield _pp(
4800             obj=self,
4801             asn1_type_name=self.asn1_type_name,
4802             obj_name=self.__class__.__name__,
4803             decode_path=decode_path,
4804             value=value,
4805             optional=self.optional,
4806             default=self == self.default,
4807             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4808             expl=None if self._expl is None else tag_decode(self._expl),
4809             offset=self.offset,
4810             tlen=self.tlen,
4811             llen=self.llen,
4812             vlen=self.vlen,
4813             expl_offset=self.expl_offset if self.expled else None,
4814             expl_tlen=self.expl_tlen if self.expled else None,
4815             expl_llen=self.expl_llen if self.expled else None,
4816             expl_vlen=self.expl_vlen if self.expled else None,
4817             expl_lenindef=self.expl_lenindef,
4818             ber_encoded=self.ber_encoded,
4819             bered=self.bered,
4820         )
4821         for pp in self.pps_lenindef(decode_path):
4822             yield pp
4823
4824
4825 class UTF8String(CommonString):
4826     __slots__ = ()
4827     tag_default = tag_encode(12)
4828     encoding = "utf-8"
4829     asn1_type_name = "UTF8String"
4830
4831
4832 class AllowableCharsMixin(object):
4833     @property
4834     def allowable_chars(self):
4835         if PY2:
4836             return self._allowable_chars
4837         return frozenset(six_unichr(c) for c in self._allowable_chars)
4838
4839
4840 class NumericString(AllowableCharsMixin, CommonString):
4841     """Numeric string
4842
4843     Its value is properly sanitized: only ASCII digits with spaces can
4844     be stored.
4845
4846     >>> NumericString().allowable_chars
4847     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4848     """
4849     __slots__ = ()
4850     tag_default = tag_encode(18)
4851     encoding = "ascii"
4852     asn1_type_name = "NumericString"
4853     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4854
4855     def _value_sanitize(self, value):
4856         value = super(NumericString, self)._value_sanitize(value)
4857         if not frozenset(value) <= self._allowable_chars:
4858             raise DecodeError("non-numeric value")
4859         return value
4860
4861
4862 PrintableStringState = namedtuple(
4863     "PrintableStringState",
4864     OctetStringState._fields + ("allowable_chars",),
4865     **NAMEDTUPLE_KWARGS
4866 )
4867
4868
4869 class PrintableString(AllowableCharsMixin, CommonString):
4870     """Printable string
4871
4872     Its value is properly sanitized: see X.680 41.4 table 10.
4873
4874     >>> PrintableString().allowable_chars
4875     frozenset([' ', "'", ..., 'z'])
4876     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4877     PrintableString PrintableString foo*bar
4878     >>> obj.allow_asterisk, obj.allow_ampersand
4879     (True, False)
4880     """
4881     __slots__ = ()
4882     tag_default = tag_encode(19)
4883     encoding = "ascii"
4884     asn1_type_name = "PrintableString"
4885     _allowable_chars = frozenset(
4886         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4887     )
4888     _asterisk = frozenset("*".encode("ascii"))
4889     _ampersand = frozenset("&".encode("ascii"))
4890
4891     def __init__(
4892             self,
4893             value=None,
4894             bounds=None,
4895             impl=None,
4896             expl=None,
4897             default=None,
4898             optional=False,
4899             _decoded=(0, 0, 0),
4900             ctx=None,
4901             allow_asterisk=False,
4902             allow_ampersand=False,
4903     ):
4904         """
4905         :param allow_asterisk: allow asterisk character
4906         :param allow_ampersand: allow ampersand character
4907         """
4908         if allow_asterisk:
4909             self._allowable_chars |= self._asterisk
4910         if allow_ampersand:
4911             self._allowable_chars |= self._ampersand
4912         super(PrintableString, self).__init__(
4913             value, bounds, impl, expl, default, optional, _decoded, ctx,
4914         )
4915
4916     @property
4917     def allow_asterisk(self):
4918         """Is asterisk character allowed?
4919         """
4920         return self._asterisk <= self._allowable_chars
4921
4922     @property
4923     def allow_ampersand(self):
4924         """Is ampersand character allowed?
4925         """
4926         return self._ampersand <= self._allowable_chars
4927
4928     def _value_sanitize(self, value):
4929         value = super(PrintableString, self)._value_sanitize(value)
4930         if not frozenset(value) <= self._allowable_chars:
4931             raise DecodeError("non-printable value")
4932         return value
4933
4934     def __getstate__(self):
4935         return PrintableStringState(
4936             *super(PrintableString, self).__getstate__(),
4937             **{"allowable_chars": self._allowable_chars}
4938         )
4939
4940     def __setstate__(self, state):
4941         super(PrintableString, self).__setstate__(state)
4942         self._allowable_chars = state.allowable_chars
4943
4944     def __call__(
4945             self,
4946             value=None,
4947             bounds=None,
4948             impl=None,
4949             expl=None,
4950             default=None,
4951             optional=None,
4952     ):
4953         return self.__class__(
4954             value=value,
4955             bounds=(
4956                 (self._bound_min, self._bound_max)
4957                 if bounds is None else bounds
4958             ),
4959             impl=self.tag if impl is None else impl,
4960             expl=self._expl if expl is None else expl,
4961             default=self.default if default is None else default,
4962             optional=self.optional if optional is None else optional,
4963             allow_asterisk=self.allow_asterisk,
4964             allow_ampersand=self.allow_ampersand,
4965         )
4966
4967
4968 class TeletexString(CommonString):
4969     __slots__ = ()
4970     tag_default = tag_encode(20)
4971     encoding = "ascii"
4972     asn1_type_name = "TeletexString"
4973
4974
4975 class T61String(TeletexString):
4976     __slots__ = ()
4977     asn1_type_name = "T61String"
4978
4979
4980 class VideotexString(CommonString):
4981     __slots__ = ()
4982     tag_default = tag_encode(21)
4983     encoding = "iso-8859-1"
4984     asn1_type_name = "VideotexString"
4985
4986
4987 class IA5String(CommonString):
4988     __slots__ = ()
4989     tag_default = tag_encode(22)
4990     encoding = "ascii"
4991     asn1_type_name = "IA5"
4992
4993
4994 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4995 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4996 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4997 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4998 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4999 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5000
5001
5002 class VisibleString(CommonString):
5003     __slots__ = ()
5004     tag_default = tag_encode(26)
5005     encoding = "ascii"
5006     asn1_type_name = "VisibleString"
5007
5008
5009 UTCTimeState = namedtuple(
5010     "UTCTimeState",
5011     OctetStringState._fields + ("ber_raw",),
5012     **NAMEDTUPLE_KWARGS
5013 )
5014
5015
5016 def str_to_time_fractions(value):
5017     v = pureint(value)
5018     year, v = (v // 10**10), (v % 10**10)
5019     month, v = (v // 10**8), (v % 10**8)
5020     day, v = (v // 10**6), (v % 10**6)
5021     hour, v = (v // 10**4), (v % 10**4)
5022     minute, second = (v // 100), (v % 100)
5023     return year, month, day, hour, minute, second
5024
5025
5026 class UTCTime(VisibleString):
5027     """``UTCTime`` datetime type
5028
5029     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5030     UTCTime UTCTime 2017-09-30T22:07:50
5031     >>> str(t)
5032     '170930220750Z'
5033     >>> bytes(t)
5034     b'170930220750Z'
5035     >>> t.todatetime()
5036     datetime.datetime(2017, 9, 30, 22, 7, 50)
5037     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5038     datetime.datetime(1957, 9, 30, 22, 7, 50)
5039
5040     If BER encoded value was met, then ``ber_raw`` attribute will hold
5041     its raw representation.
5042
5043     .. warning::
5044
5045        Pay attention that UTCTime can not hold full year, so all years
5046        having < 50 years are treated as 20xx, 19xx otherwise, according
5047        to X.509 recommendation.
5048
5049     .. warning::
5050
5051        No strict validation of UTC offsets are made, but very crude:
5052
5053        * minutes are not exceeding 60
5054        * offset value is not exceeding 14 hours
5055     """
5056     __slots__ = ("ber_raw",)
5057     tag_default = tag_encode(23)
5058     encoding = "ascii"
5059     asn1_type_name = "UTCTime"
5060     evgen_mode_skip_value = False
5061
5062     def __init__(
5063             self,
5064             value=None,
5065             impl=None,
5066             expl=None,
5067             default=None,
5068             optional=False,
5069             _decoded=(0, 0, 0),
5070             bounds=None,  # dummy argument, workability for OctetString.decode
5071             ctx=None,
5072     ):
5073         """
5074         :param value: set the value. Either datetime type, or
5075                       :py:class:`pyderasn.UTCTime` object
5076         :param bytes impl: override default tag with ``IMPLICIT`` one
5077         :param bytes expl: override default tag with ``EXPLICIT`` one
5078         :param default: set default value. Type same as in ``value``
5079         :param bool optional: is object ``OPTIONAL`` in sequence
5080         """
5081         super(UTCTime, self).__init__(
5082             None, None, impl, expl, None, optional, _decoded, ctx,
5083         )
5084         self._value = value
5085         self.ber_raw = None
5086         if value is not None:
5087             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5088             self.ber_encoded = self.ber_raw is not None
5089         if default is not None:
5090             default, _ = self._value_sanitize(default)
5091             self.default = self.__class__(
5092                 value=default,
5093                 impl=self.tag,
5094                 expl=self._expl,
5095             )
5096             if self._value is None:
5097                 self._value = default
5098             optional = True
5099         self.optional = optional
5100
5101     def _strptime_bered(self, value):
5102         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5103         value = value[10:]
5104         if len(value) == 0:
5105             raise ValueError("no timezone")
5106         year += 2000 if year < 50 else 1900
5107         decoded = datetime(year, month, day, hour, minute)
5108         offset = 0
5109         if value[-1] == "Z":
5110             value = value[:-1]
5111         else:
5112             if len(value) < 5:
5113                 raise ValueError("invalid UTC offset")
5114             if value[-5] == "-":
5115                 sign = -1
5116             elif value[-5] == "+":
5117                 sign = 1
5118             else:
5119                 raise ValueError("invalid UTC offset")
5120             v = pureint(value[-4:])
5121             offset, v = (60 * (v % 100)), v // 100
5122             if offset >= 3600:
5123                 raise ValueError("invalid UTC offset minutes")
5124             offset += 3600 * v
5125             if offset > 14 * 3600:
5126                 raise ValueError("too big UTC offset")
5127             offset *= sign
5128             value = value[:-5]
5129         if len(value) == 0:
5130             return offset, decoded
5131         if len(value) != 2:
5132             raise ValueError("invalid UTC offset seconds")
5133         seconds = pureint(value)
5134         if seconds >= 60:
5135             raise ValueError("invalid seconds value")
5136         return offset, decoded + timedelta(seconds=seconds)
5137
5138     def _strptime(self, value):
5139         # datetime.strptime's format: %y%m%d%H%M%SZ
5140         if len(value) != LEN_YYMMDDHHMMSSZ:
5141             raise ValueError("invalid UTCTime length")
5142         if value[-1] != "Z":
5143             raise ValueError("non UTC timezone")
5144         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5145         year += 2000 if year < 50 else 1900
5146         return datetime(year, month, day, hour, minute, second)
5147
5148     def _dt_sanitize(self, value):
5149         if value.year < 1950 or value.year > 2049:
5150             raise ValueError("UTCTime can hold only 1950-2049 years")
5151         return value.replace(microsecond=0)
5152
5153     def _value_sanitize(self, value, ctx=None):
5154         if value.__class__ == binary_type:
5155             try:
5156                 value_decoded = value.decode("ascii")
5157             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5158                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5159             err = None
5160             try:
5161                 return self._strptime(value_decoded), None
5162             except (TypeError, ValueError) as _err:
5163                 err = _err
5164                 if (ctx is not None) and ctx.get("bered", False):
5165                     try:
5166                         offset, _value = self._strptime_bered(value_decoded)
5167                         _value = _value - timedelta(seconds=offset)
5168                         return self._dt_sanitize(_value), value
5169                     except (TypeError, ValueError, OverflowError) as _err:
5170                         err = _err
5171             raise DecodeError(
5172                 "invalid %s format: %r" % (self.asn1_type_name, err),
5173                 klass=self.__class__,
5174             )
5175         if isinstance(value, self.__class__):
5176             return value._value, None
5177         if value.__class__ == datetime:
5178             return self._dt_sanitize(value), None
5179         raise InvalidValueType((self.__class__, datetime))
5180
5181     def _pp_value(self):
5182         if self.ready:
5183             value = self._value.isoformat()
5184             if self.ber_encoded:
5185                 value += " (%s)" % self.ber_raw
5186             return value
5187         return None
5188
5189     def __unicode__(self):
5190         if self.ready:
5191             value = self._value.isoformat()
5192             if self.ber_encoded:
5193                 value += " (%s)" % self.ber_raw
5194             return value
5195         return text_type(self._pp_value())
5196
5197     def __getstate__(self):
5198         return UTCTimeState(
5199             *super(UTCTime, self).__getstate__(),
5200             **{"ber_raw": self.ber_raw}
5201         )
5202
5203     def __setstate__(self, state):
5204         super(UTCTime, self).__setstate__(state)
5205         self.ber_raw = state.ber_raw
5206
5207     def __bytes__(self):
5208         self._assert_ready()
5209         return self._encode_time()
5210
5211     def __eq__(self, their):
5212         if their.__class__ == binary_type:
5213             return self._encode_time() == their
5214         if their.__class__ == datetime:
5215             return self.todatetime() == their
5216         if not isinstance(their, self.__class__):
5217             return False
5218         return (
5219             self._value == their._value and
5220             self.tag == their.tag and
5221             self._expl == their._expl
5222         )
5223
5224     def _encode_time(self):
5225         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5226
5227     def _encode(self):
5228         self._assert_ready()
5229         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5230
5231     def _encode1st(self, state):
5232         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5233
5234     def _encode2nd(self, writer, state_iter):
5235         self._assert_ready()
5236         write_full(writer, self._encode())
5237
5238     def _encode_cer(self, writer):
5239         write_full(writer, self._encode())
5240
5241     def todatetime(self):
5242         return self._value
5243
5244     def __repr__(self):
5245         return pp_console_row(next(self.pps()))
5246
5247     def pps(self, decode_path=()):
5248         yield _pp(
5249             obj=self,
5250             asn1_type_name=self.asn1_type_name,
5251             obj_name=self.__class__.__name__,
5252             decode_path=decode_path,
5253             value=self._pp_value(),
5254             optional=self.optional,
5255             default=self == self.default,
5256             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5257             expl=None if self._expl is None else tag_decode(self._expl),
5258             offset=self.offset,
5259             tlen=self.tlen,
5260             llen=self.llen,
5261             vlen=self.vlen,
5262             expl_offset=self.expl_offset if self.expled else None,
5263             expl_tlen=self.expl_tlen if self.expled else None,
5264             expl_llen=self.expl_llen if self.expled else None,
5265             expl_vlen=self.expl_vlen if self.expled else None,
5266             expl_lenindef=self.expl_lenindef,
5267             ber_encoded=self.ber_encoded,
5268             bered=self.bered,
5269         )
5270         for pp in self.pps_lenindef(decode_path):
5271             yield pp
5272
5273
5274 class GeneralizedTime(UTCTime):
5275     """``GeneralizedTime`` datetime type
5276
5277     This type is similar to :py:class:`pyderasn.UTCTime`.
5278
5279     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5280     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5281     >>> str(t)
5282     '20170930220750.000123Z'
5283     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5284     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5285
5286     .. warning::
5287
5288        Only microsecond fractions are supported in DER encoding.
5289        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5290        higher precision values.
5291
5292     .. warning::
5293
5294        BER encoded data can loss information (accuracy) during decoding
5295        because of float transformations.
5296
5297     .. warning::
5298
5299        Local times (without explicit timezone specification) are treated
5300        as UTC one, no transformations are made.
5301
5302     .. warning::
5303
5304        Zero year is unsupported.
5305     """
5306     __slots__ = ()
5307     tag_default = tag_encode(24)
5308     asn1_type_name = "GeneralizedTime"
5309
5310     def _dt_sanitize(self, value):
5311         return value
5312
5313     def _strptime_bered(self, value):
5314         if len(value) < 4 + 3 * 2:
5315             raise ValueError("invalid GeneralizedTime")
5316         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5317         decoded = datetime(year, month, day, hour)
5318         offset, value = 0, value[10:]
5319         if len(value) == 0:
5320             return offset, decoded
5321         if value[-1] == "Z":
5322             value = value[:-1]
5323         else:
5324             for char, sign in (("-", -1), ("+", 1)):
5325                 idx = value.rfind(char)
5326                 if idx == -1:
5327                     continue
5328                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5329                 v = pureint(offset_raw)
5330                 if len(offset_raw) == 4:
5331                     offset, v = (60 * (v % 100)), v // 100
5332                     if offset >= 3600:
5333                         raise ValueError("invalid UTC offset minutes")
5334                 elif len(offset_raw) == 2:
5335                     pass
5336                 else:
5337                     raise ValueError("invalid UTC offset")
5338                 offset += 3600 * v
5339                 if offset > 14 * 3600:
5340                     raise ValueError("too big UTC offset")
5341                 offset *= sign
5342                 break
5343         if len(value) == 0:
5344             return offset, decoded
5345         if value[0] in DECIMAL_SIGNS:
5346             return offset, (
5347                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5348             )
5349         if len(value) < 2:
5350             raise ValueError("stripped minutes")
5351         decoded += timedelta(seconds=60 * pureint(value[:2]))
5352         value = value[2:]
5353         if len(value) == 0:
5354             return offset, decoded
5355         if value[0] in DECIMAL_SIGNS:
5356             return offset, (
5357                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5358             )
5359         if len(value) < 2:
5360             raise ValueError("stripped seconds")
5361         decoded += timedelta(seconds=pureint(value[:2]))
5362         value = value[2:]
5363         if len(value) == 0:
5364             return offset, decoded
5365         if value[0] not in DECIMAL_SIGNS:
5366             raise ValueError("invalid format after seconds")
5367         return offset, (
5368             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5369         )
5370
5371     def _strptime(self, value):
5372         l = len(value)
5373         if l == LEN_YYYYMMDDHHMMSSZ:
5374             # datetime.strptime's format: %Y%m%d%H%M%SZ
5375             if value[-1] != "Z":
5376                 raise ValueError("non UTC timezone")
5377             return datetime(*str_to_time_fractions(value[:-1]))
5378         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5379             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5380             if value[-1] != "Z":
5381                 raise ValueError("non UTC timezone")
5382             if value[14] != ".":
5383                 raise ValueError("no fractions separator")
5384             us = value[15:-1]
5385             if us[-1] == "0":
5386                 raise ValueError("trailing zero")
5387             us_len = len(us)
5388             if us_len > 6:
5389                 raise ValueError("only microsecond fractions are supported")
5390             us = pureint(us + ("0" * (6 - us_len)))
5391             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5392             return datetime(year, month, day, hour, minute, second, us)
5393         raise ValueError("invalid GeneralizedTime length")
5394
5395     def _encode_time(self):
5396         value = self._value
5397         encoded = value.strftime("%Y%m%d%H%M%S")
5398         if value.microsecond > 0:
5399             encoded += (".%06d" % value.microsecond).rstrip("0")
5400         return (encoded + "Z").encode("ascii")
5401
5402     def _encode(self):
5403         self._assert_ready()
5404         value = self._value
5405         if value.microsecond > 0:
5406             encoded = self._encode_time()
5407             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5408         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5409
5410     def _encode1st(self, state):
5411         self._assert_ready()
5412         vlen = len(self._encode_time())
5413         return len(self.tag) + len_size(vlen) + vlen, state
5414
5415     def _encode2nd(self, writer, state_iter):
5416         write_full(writer, self._encode())
5417
5418
5419 class GraphicString(CommonString):
5420     __slots__ = ()
5421     tag_default = tag_encode(25)
5422     encoding = "iso-8859-1"
5423     asn1_type_name = "GraphicString"
5424
5425
5426 class ISO646String(VisibleString):
5427     __slots__ = ()
5428     asn1_type_name = "ISO646String"
5429
5430
5431 class GeneralString(CommonString):
5432     __slots__ = ()
5433     tag_default = tag_encode(27)
5434     encoding = "iso-8859-1"
5435     asn1_type_name = "GeneralString"
5436
5437
5438 class UniversalString(CommonString):
5439     __slots__ = ()
5440     tag_default = tag_encode(28)
5441     encoding = "utf-32-be"
5442     asn1_type_name = "UniversalString"
5443
5444
5445 class BMPString(CommonString):
5446     __slots__ = ()
5447     tag_default = tag_encode(30)
5448     encoding = "utf-16-be"
5449     asn1_type_name = "BMPString"
5450
5451
5452 ChoiceState = namedtuple(
5453     "ChoiceState",
5454     BasicState._fields + ("specs", "value",),
5455     **NAMEDTUPLE_KWARGS
5456 )
5457
5458
5459 class Choice(Obj):
5460     """``CHOICE`` special type
5461
5462     ::
5463
5464         class GeneralName(Choice):
5465             schema = (
5466                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5467                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5468             )
5469
5470     >>> gn = GeneralName()
5471     GeneralName CHOICE
5472     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5473     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5474     >>> gn["dNSName"] = IA5String("bar.baz")
5475     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5476     >>> gn["rfc822Name"]
5477     None
5478     >>> gn["dNSName"]
5479     [2] IA5String IA5 bar.baz
5480     >>> gn.choice
5481     'dNSName'
5482     >>> gn.value == gn["dNSName"]
5483     True
5484     >>> gn.specs
5485     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5486
5487     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5488     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5489     """
5490     __slots__ = ("specs",)
5491     tag_default = None
5492     asn1_type_name = "CHOICE"
5493
5494     def __init__(
5495             self,
5496             value=None,
5497             schema=None,
5498             impl=None,
5499             expl=None,
5500             default=None,
5501             optional=False,
5502             _decoded=(0, 0, 0),
5503     ):
5504         """
5505         :param value: set the value. Either ``(choice, value)`` tuple, or
5506                       :py:class:`pyderasn.Choice` object
5507         :param bytes impl: can not be set, do **not** use it
5508         :param bytes expl: override default tag with ``EXPLICIT`` one
5509         :param default: set default value. Type same as in ``value``
5510         :param bool optional: is object ``OPTIONAL`` in sequence
5511         """
5512         if impl is not None:
5513             raise ValueError("no implicit tag allowed for CHOICE")
5514         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5515         if schema is None:
5516             schema = getattr(self, "schema", ())
5517         if len(schema) == 0:
5518             raise ValueError("schema must be specified")
5519         self.specs = (
5520             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5521         )
5522         self._value = None
5523         if value is not None:
5524             self._value = self._value_sanitize(value)
5525         if default is not None:
5526             default_value = self._value_sanitize(default)
5527             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5528             default_obj.specs = self.specs
5529             default_obj._value = default_value
5530             self.default = default_obj
5531             if value is None:
5532                 self._value = copy(default_obj._value)
5533         if self._expl is not None:
5534             tag_class, _, tag_num = tag_decode(self._expl)
5535             self._tag_order = (tag_class, tag_num)
5536
5537     def _value_sanitize(self, value):
5538         if (value.__class__ == tuple) and len(value) == 2:
5539             choice, obj = value
5540             spec = self.specs.get(choice)
5541             if spec is None:
5542                 raise ObjUnknown(choice)
5543             if not isinstance(obj, spec.__class__):
5544                 raise InvalidValueType((spec,))
5545             return (choice, spec(obj))
5546         if isinstance(value, self.__class__):
5547             return value._value
5548         raise InvalidValueType((self.__class__, tuple))
5549
5550     @property
5551     def ready(self):
5552         return self._value is not None and self._value[1].ready
5553
5554     @property
5555     def bered(self):
5556         return self.expl_lenindef or (
5557             (self._value is not None) and
5558             self._value[1].bered
5559         )
5560
5561     def __getstate__(self):
5562         return ChoiceState(
5563             __version__,
5564             self.tag,
5565             self._tag_order,
5566             self._expl,
5567             self.default,
5568             self.optional,
5569             self.offset,
5570             self.llen,
5571             self.vlen,
5572             self.expl_lenindef,
5573             self.lenindef,
5574             self.ber_encoded,
5575             self.specs,
5576             copy(self._value),
5577         )
5578
5579     def __setstate__(self, state):
5580         super(Choice, self).__setstate__(state)
5581         self.specs = state.specs
5582         self._value = state.value
5583
5584     def __eq__(self, their):
5585         if (their.__class__ == tuple) and len(their) == 2:
5586             return self._value == their
5587         if not isinstance(their, self.__class__):
5588             return False
5589         return (
5590             self.specs == their.specs and
5591             self._value == their._value
5592         )
5593
5594     def __call__(
5595             self,
5596             value=None,
5597             expl=None,
5598             default=None,
5599             optional=None,
5600     ):
5601         return self.__class__(
5602             value=value,
5603             schema=self.specs,
5604             expl=self._expl if expl is None else expl,
5605             default=self.default if default is None else default,
5606             optional=self.optional if optional is None else optional,
5607         )
5608
5609     @property
5610     def choice(self):
5611         """Name of the choice
5612         """
5613         self._assert_ready()
5614         return self._value[0]
5615
5616     @property
5617     def value(self):
5618         """Value of underlying choice
5619         """
5620         self._assert_ready()
5621         return self._value[1]
5622
5623     @property
5624     def tag_order(self):
5625         self._assert_ready()
5626         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5627
5628     @property
5629     def tag_order_cer(self):
5630         return min(v.tag_order_cer for v in itervalues(self.specs))
5631
5632     def __getitem__(self, key):
5633         if key not in self.specs:
5634             raise ObjUnknown(key)
5635         if self._value is None:
5636             return None
5637         choice, value = self._value
5638         if choice != key:
5639             return None
5640         return value
5641
5642     def __setitem__(self, key, value):
5643         spec = self.specs.get(key)
5644         if spec is None:
5645             raise ObjUnknown(key)
5646         if not isinstance(value, spec.__class__):
5647             raise InvalidValueType((spec.__class__,))
5648         self._value = (key, spec(value))
5649
5650     @property
5651     def tlen(self):
5652         return 0
5653
5654     @property
5655     def decoded(self):
5656         return self._value[1].decoded if self.ready else False
5657
5658     def _encode(self):
5659         self._assert_ready()
5660         return self._value[1].encode()
5661
5662     def _encode1st(self, state):
5663         self._assert_ready()
5664         return self._value[1].encode1st(state)
5665
5666     def _encode2nd(self, writer, state_iter):
5667         self._value[1].encode2nd(writer, state_iter)
5668
5669     def _encode_cer(self, writer):
5670         self._assert_ready()
5671         self._value[1].encode_cer(writer)
5672
5673     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5674         for choice, spec in iteritems(self.specs):
5675             sub_decode_path = decode_path + (choice,)
5676             try:
5677                 spec.decode(
5678                     tlv,
5679                     offset=offset,
5680                     leavemm=True,
5681                     decode_path=sub_decode_path,
5682                     ctx=ctx,
5683                     tag_only=True,
5684                     _ctx_immutable=False,
5685                 )
5686             except TagMismatch:
5687                 continue
5688             break
5689         else:
5690             raise TagMismatch(
5691                 klass=self.__class__,
5692                 decode_path=decode_path,
5693                 offset=offset,
5694             )
5695         if tag_only:  # pragma: no cover
5696             yield None
5697             return
5698         if evgen_mode:
5699             for _decode_path, value, tail in spec.decode_evgen(
5700                     tlv,
5701                     offset=offset,
5702                     leavemm=True,
5703                     decode_path=sub_decode_path,
5704                     ctx=ctx,
5705                     _ctx_immutable=False,
5706             ):
5707                 yield _decode_path, value, tail
5708         else:
5709             _, value, tail = next(spec.decode_evgen(
5710                 tlv,
5711                 offset=offset,
5712                 leavemm=True,
5713                 decode_path=sub_decode_path,
5714                 ctx=ctx,
5715                 _ctx_immutable=False,
5716                 _evgen_mode=False,
5717             ))
5718         obj = self.__class__(
5719             schema=self.specs,
5720             expl=self._expl,
5721             default=self.default,
5722             optional=self.optional,
5723             _decoded=(offset, 0, value.fulllen),
5724         )
5725         obj._value = (choice, value)
5726         yield decode_path, obj, tail
5727
5728     def __repr__(self):
5729         value = pp_console_row(next(self.pps()))
5730         if self.ready:
5731             value = "%s[%r]" % (value, self.value)
5732         return value
5733
5734     def pps(self, decode_path=()):
5735         yield _pp(
5736             obj=self,
5737             asn1_type_name=self.asn1_type_name,
5738             obj_name=self.__class__.__name__,
5739             decode_path=decode_path,
5740             value=self.choice if self.ready else None,
5741             optional=self.optional,
5742             default=self == self.default,
5743             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5744             expl=None if self._expl is None else tag_decode(self._expl),
5745             offset=self.offset,
5746             tlen=self.tlen,
5747             llen=self.llen,
5748             vlen=self.vlen,
5749             expl_lenindef=self.expl_lenindef,
5750             bered=self.bered,
5751         )
5752         if self.ready:
5753             yield self.value.pps(decode_path=decode_path + (self.choice,))
5754         for pp in self.pps_lenindef(decode_path):
5755             yield pp
5756
5757
5758 class PrimitiveTypes(Choice):
5759     """Predefined ``CHOICE`` for all generic primitive types
5760
5761     It could be useful for general decoding of some unspecified values:
5762
5763     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5764     OCTET STRING 3 bytes 666f6f
5765     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5766     INTEGER 1193046
5767     """
5768     __slots__ = ()
5769     schema = tuple((klass.__name__, klass()) for klass in (
5770         Boolean,
5771         Integer,
5772         BitString,
5773         OctetString,
5774         Null,
5775         ObjectIdentifier,
5776         UTF8String,
5777         NumericString,
5778         PrintableString,
5779         TeletexString,
5780         VideotexString,
5781         IA5String,
5782         UTCTime,
5783         GeneralizedTime,
5784         GraphicString,
5785         VisibleString,
5786         ISO646String,
5787         GeneralString,
5788         UniversalString,
5789         BMPString,
5790     ))
5791
5792
5793 AnyState = namedtuple(
5794     "AnyState",
5795     BasicState._fields + ("value", "defined"),
5796     **NAMEDTUPLE_KWARGS
5797 )
5798
5799
5800 class Any(Obj):
5801     """``ANY`` special type
5802
5803     >>> Any(Integer(-123))
5804     ANY INTEGER -123 (0X:7B)
5805     >>> a = Any(OctetString(b"hello world").encode())
5806     ANY 040b68656c6c6f20776f726c64
5807     >>> hexenc(bytes(a))
5808     b'0x040x0bhello world'
5809     """
5810     __slots__ = ("defined",)
5811     tag_default = tag_encode(0)
5812     asn1_type_name = "ANY"
5813
5814     def __init__(
5815             self,
5816             value=None,
5817             expl=None,
5818             optional=False,
5819             _decoded=(0, 0, 0),
5820     ):
5821         """
5822         :param value: set the value. Either any kind of pyderasn's
5823                       **ready** object, or bytes. Pay attention that
5824                       **no** validation is performed if raw binary value
5825                       is valid TLV, except just tag decoding
5826         :param bytes expl: override default tag with ``EXPLICIT`` one
5827         :param bool optional: is object ``OPTIONAL`` in sequence
5828         """
5829         super(Any, self).__init__(None, expl, None, optional, _decoded)
5830         if value is None:
5831             self._value = None
5832         else:
5833             value = self._value_sanitize(value)
5834             self._value = value
5835             if self._expl is None:
5836                 if value.__class__ == binary_type:
5837                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5838                 else:
5839                     tag_class, tag_num = value.tag_order
5840             else:
5841                 tag_class, _, tag_num = tag_decode(self._expl)
5842             self._tag_order = (tag_class, tag_num)
5843         self.defined = None
5844
5845     def _value_sanitize(self, value):
5846         if value.__class__ == binary_type:
5847             if len(value) == 0:
5848                 raise ValueError("Any value can not be empty")
5849             return value
5850         if isinstance(value, self.__class__):
5851             return value._value
5852         if not isinstance(value, Obj):
5853             raise InvalidValueType((self.__class__, Obj, binary_type))
5854         return value
5855
5856     @property
5857     def ready(self):
5858         return self._value is not None
5859
5860     @property
5861     def tag_order(self):
5862         self._assert_ready()
5863         return self._tag_order
5864
5865     @property
5866     def bered(self):
5867         if self.expl_lenindef or self.lenindef:
5868             return True
5869         if self.defined is None:
5870             return False
5871         return self.defined[1].bered
5872
5873     def __getstate__(self):
5874         return AnyState(
5875             __version__,
5876             self.tag,
5877             self._tag_order,
5878             self._expl,
5879             None,
5880             self.optional,
5881             self.offset,
5882             self.llen,
5883             self.vlen,
5884             self.expl_lenindef,
5885             self.lenindef,
5886             self.ber_encoded,
5887             self._value,
5888             self.defined,
5889         )
5890
5891     def __setstate__(self, state):
5892         super(Any, self).__setstate__(state)
5893         self._value = state.value
5894         self.defined = state.defined
5895
5896     def __eq__(self, their):
5897         if their.__class__ == binary_type:
5898             if self._value.__class__ == binary_type:
5899                 return self._value == their
5900             return self._value.encode() == their
5901         if issubclass(their.__class__, Any):
5902             if self.ready and their.ready:
5903                 return bytes(self) == bytes(their)
5904             return self.ready == their.ready
5905         return False
5906
5907     def __call__(
5908             self,
5909             value=None,
5910             expl=None,
5911             optional=None,
5912     ):
5913         return self.__class__(
5914             value=value,
5915             expl=self._expl if expl is None else expl,
5916             optional=self.optional if optional is None else optional,
5917         )
5918
5919     def __bytes__(self):
5920         self._assert_ready()
5921         value = self._value
5922         if value.__class__ == binary_type:
5923             return value
5924         return self._value.encode()
5925
5926     @property
5927     def tlen(self):
5928         return 0
5929
5930     def _encode(self):
5931         self._assert_ready()
5932         value = self._value
5933         if value.__class__ == binary_type:
5934             return value
5935         return value.encode()
5936
5937     def _encode1st(self, state):
5938         self._assert_ready()
5939         value = self._value
5940         if value.__class__ == binary_type:
5941             return len(value), state
5942         return value.encode1st(state)
5943
5944     def _encode2nd(self, writer, state_iter):
5945         value = self._value
5946         if value.__class__ == binary_type:
5947             write_full(writer, value)
5948         else:
5949             value.encode2nd(writer, state_iter)
5950
5951     def _encode_cer(self, writer):
5952         self._assert_ready()
5953         value = self._value
5954         if value.__class__ == binary_type:
5955             write_full(writer, value)
5956         else:
5957             value.encode_cer(writer)
5958
5959     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5960         try:
5961             t, tlen, lv = tag_strip(tlv)
5962         except DecodeError as err:
5963             raise err.__class__(
5964                 msg=err.msg,
5965                 klass=self.__class__,
5966                 decode_path=decode_path,
5967                 offset=offset,
5968             )
5969         try:
5970             l, llen, v = len_decode(lv)
5971         except LenIndefForm as err:
5972             if not ctx.get("bered", False):
5973                 raise err.__class__(
5974                     msg=err.msg,
5975                     klass=self.__class__,
5976                     decode_path=decode_path,
5977                     offset=offset,
5978                 )
5979             llen, vlen, v = 1, 0, lv[1:]
5980             sub_offset = offset + tlen + llen
5981             chunk_i = 0
5982             while v[:EOC_LEN].tobytes() != EOC:
5983                 chunk, v = Any().decode(
5984                     v,
5985                     offset=sub_offset,
5986                     decode_path=decode_path + (str(chunk_i),),
5987                     leavemm=True,
5988                     ctx=ctx,
5989                     _ctx_immutable=False,
5990                 )
5991                 vlen += chunk.tlvlen
5992                 sub_offset += chunk.tlvlen
5993                 chunk_i += 1
5994             tlvlen = tlen + llen + vlen + EOC_LEN
5995             obj = self.__class__(
5996                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5997                 expl=self._expl,
5998                 optional=self.optional,
5999                 _decoded=(offset, 0, tlvlen),
6000             )
6001             obj.lenindef = True
6002             obj.tag = t.tobytes()
6003             yield decode_path, obj, v[EOC_LEN:]
6004             return
6005         except DecodeError as err:
6006             raise err.__class__(
6007                 msg=err.msg,
6008                 klass=self.__class__,
6009                 decode_path=decode_path,
6010                 offset=offset,
6011             )
6012         if l > len(v):
6013             raise NotEnoughData(
6014                 "encoded length is longer than data",
6015                 klass=self.__class__,
6016                 decode_path=decode_path,
6017                 offset=offset,
6018             )
6019         tlvlen = tlen + llen + l
6020         v, tail = tlv[:tlvlen], v[l:]
6021         obj = self.__class__(
6022             value=None if evgen_mode else v.tobytes(),
6023             expl=self._expl,
6024             optional=self.optional,
6025             _decoded=(offset, 0, tlvlen),
6026         )
6027         obj.tag = t.tobytes()
6028         yield decode_path, obj, tail
6029
6030     def __repr__(self):
6031         return pp_console_row(next(self.pps()))
6032
6033     def pps(self, decode_path=()):
6034         value = self._value
6035         if value is None:
6036             pass
6037         elif value.__class__ == binary_type:
6038             value = None
6039         else:
6040             value = repr(value)
6041         yield _pp(
6042             obj=self,
6043             asn1_type_name=self.asn1_type_name,
6044             obj_name=self.__class__.__name__,
6045             decode_path=decode_path,
6046             value=value,
6047             blob=self._value if self._value.__class__ == binary_type else None,
6048             optional=self.optional,
6049             default=self == self.default,
6050             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6051             expl=None if self._expl is None else tag_decode(self._expl),
6052             offset=self.offset,
6053             tlen=self.tlen,
6054             llen=self.llen,
6055             vlen=self.vlen,
6056             expl_offset=self.expl_offset if self.expled else None,
6057             expl_tlen=self.expl_tlen if self.expled else None,
6058             expl_llen=self.expl_llen if self.expled else None,
6059             expl_vlen=self.expl_vlen if self.expled else None,
6060             expl_lenindef=self.expl_lenindef,
6061             lenindef=self.lenindef,
6062             bered=self.bered,
6063         )
6064         defined_by, defined = self.defined or (None, None)
6065         if defined_by is not None:
6066             yield defined.pps(
6067                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6068             )
6069         for pp in self.pps_lenindef(decode_path):
6070             yield pp
6071
6072
6073 ########################################################################
6074 # ASN.1 constructed types
6075 ########################################################################
6076
6077 def abs_decode_path(decode_path, rel_path):
6078     """Create an absolute decode path from current and relative ones
6079
6080     :param decode_path: current decode path, starting point. Tuple of strings
6081     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6082                      If first tuple's element is "/", then treat it as
6083                      an absolute path, ignoring ``decode_path`` as
6084                      starting point. Also this tuple can contain ".."
6085                      elements, stripping the leading element from
6086                      ``decode_path``
6087
6088     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6089     ("foo", "bar", "baz", "whatever")
6090     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6091     ("foo", "whatever")
6092     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6093     ("baz", "whatever")
6094     """
6095     if rel_path[0] == "/":
6096         return rel_path[1:]
6097     if rel_path[0] == "..":
6098         return abs_decode_path(decode_path[:-1], rel_path[1:])
6099     return decode_path + rel_path
6100
6101
6102 SequenceState = namedtuple(
6103     "SequenceState",
6104     BasicState._fields + ("specs", "value",),
6105     **NAMEDTUPLE_KWARGS
6106 )
6107
6108
6109 class SequenceEncode1stMixing(object):
6110     def _encode1st(self, state):
6111         state.append(0)
6112         idx = len(state) - 1
6113         vlen = 0
6114         for v in self._values_for_encoding():
6115             l, _ = v.encode1st(state)
6116             vlen += l
6117         state[idx] = vlen
6118         return len(self.tag) + len_size(vlen) + vlen, state
6119
6120
6121 class Sequence(SequenceEncode1stMixing, Obj):
6122     """``SEQUENCE`` structure type
6123
6124     You have to make specification of sequence::
6125
6126         class Extension(Sequence):
6127             schema = (
6128                 ("extnID", ObjectIdentifier()),
6129                 ("critical", Boolean(default=False)),
6130                 ("extnValue", OctetString()),
6131             )
6132
6133     Then, you can work with it as with dictionary.
6134
6135     >>> ext = Extension()
6136     >>> Extension().specs
6137     OrderedDict([
6138         ('extnID', OBJECT IDENTIFIER),
6139         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6140         ('extnValue', OCTET STRING),
6141     ])
6142     >>> ext["extnID"] = "1.2.3"
6143     Traceback (most recent call last):
6144     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6145     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6146
6147     You can determine if sequence is ready to be encoded:
6148
6149     >>> ext.ready
6150     False
6151     >>> ext.encode()
6152     Traceback (most recent call last):
6153     pyderasn.ObjNotReady: object is not ready: extnValue
6154     >>> ext["extnValue"] = OctetString(b"foobar")
6155     >>> ext.ready
6156     True
6157
6158     Value you want to assign, must have the same **type** as in
6159     corresponding specification, but it can have different tags,
6160     optional/default attributes -- they will be taken from specification
6161     automatically::
6162
6163         class TBSCertificate(Sequence):
6164             schema = (
6165                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6166             [...]
6167
6168     >>> tbs = TBSCertificate()
6169     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6170
6171     Assign ``None`` to remove value from sequence.
6172
6173     You can set values in Sequence during its initialization:
6174
6175     >>> AlgorithmIdentifier((
6176         ("algorithm", ObjectIdentifier("1.2.3")),
6177         ("parameters", Any(Null()))
6178     ))
6179     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6180
6181     You can determine if value exists/set in the sequence and take its value:
6182
6183     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6184     (True, True, False)
6185     >>> ext["extnID"]
6186     OBJECT IDENTIFIER 1.2.3
6187
6188     But pay attention that if value has default, then it won't be (not
6189     in) in the sequence (because ``DEFAULT`` must not be encoded in
6190     DER), but you can read its value:
6191
6192     >>> "critical" in ext, ext["critical"]
6193     (False, BOOLEAN False)
6194     >>> ext["critical"] = Boolean(True)
6195     >>> "critical" in ext, ext["critical"]
6196     (True, BOOLEAN True)
6197
6198     All defaulted values are always optional.
6199
6200     .. _allow_default_values_ctx:
6201
6202     DER prohibits default value encoding and will raise an error if
6203     default value is unexpectedly met during decode.
6204     If :ref:`bered <bered_ctx>` context option is set, then no error
6205     will be raised, but ``bered`` attribute set. You can disable strict
6206     defaulted values existence validation by setting
6207     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6208
6209     All values with DEFAULT specified are decoded atomically in
6210     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6211     SEQUENCE, then it will be yielded as a single element, not
6212     disassembled. That is required for DEFAULT existence check.
6213
6214     Two sequences are equal if they have equal specification (schema),
6215     implicit/explicit tagging and the same values.
6216     """
6217     __slots__ = ("specs",)
6218     tag_default = tag_encode(form=TagFormConstructed, num=16)
6219     asn1_type_name = "SEQUENCE"
6220
6221     def __init__(
6222             self,
6223             value=None,
6224             schema=None,
6225             impl=None,
6226             expl=None,
6227             default=None,
6228             optional=False,
6229             _decoded=(0, 0, 0),
6230     ):
6231         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6232         if schema is None:
6233             schema = getattr(self, "schema", ())
6234         self.specs = (
6235             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6236         )
6237         self._value = {}
6238         if value is not None:
6239             if issubclass(value.__class__, Sequence):
6240                 self._value = value._value
6241             elif hasattr(value, "__iter__"):
6242                 for seq_key, seq_value in value:
6243                     self[seq_key] = seq_value
6244             else:
6245                 raise InvalidValueType((Sequence,))
6246         if default is not None:
6247             if not issubclass(default.__class__, Sequence):
6248                 raise InvalidValueType((Sequence,))
6249             default_value = default._value
6250             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6251             default_obj.specs = self.specs
6252             default_obj._value = default_value
6253             self.default = default_obj
6254             if value is None:
6255                 self._value = copy(default_obj._value)
6256
6257     @property
6258     def ready(self):
6259         for name, spec in iteritems(self.specs):
6260             value = self._value.get(name)
6261             if value is None:
6262                 if spec.optional:
6263                     continue
6264                 return False
6265             if not value.ready:
6266                 return False
6267         return True
6268
6269     @property
6270     def bered(self):
6271         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6272             return True
6273         return any(value.bered for value in itervalues(self._value))
6274
6275     def __getstate__(self):
6276         return SequenceState(
6277             __version__,
6278             self.tag,
6279             self._tag_order,
6280             self._expl,
6281             self.default,
6282             self.optional,
6283             self.offset,
6284             self.llen,
6285             self.vlen,
6286             self.expl_lenindef,
6287             self.lenindef,
6288             self.ber_encoded,
6289             self.specs,
6290             {k: copy(v) for k, v in iteritems(self._value)},
6291         )
6292
6293     def __setstate__(self, state):
6294         super(Sequence, self).__setstate__(state)
6295         self.specs = state.specs
6296         self._value = state.value
6297
6298     def __eq__(self, their):
6299         if not isinstance(their, self.__class__):
6300             return False
6301         return (
6302             self.specs == their.specs and
6303             self.tag == their.tag and
6304             self._expl == their._expl and
6305             self._value == their._value
6306         )
6307
6308     def __call__(
6309             self,
6310             value=None,
6311             impl=None,
6312             expl=None,
6313             default=None,
6314             optional=None,
6315     ):
6316         return self.__class__(
6317             value=value,
6318             schema=self.specs,
6319             impl=self.tag if impl is None else impl,
6320             expl=self._expl if expl is None else expl,
6321             default=self.default if default is None else default,
6322             optional=self.optional if optional is None else optional,
6323         )
6324
6325     def __contains__(self, key):
6326         return key in self._value
6327
6328     def __setitem__(self, key, value):
6329         spec = self.specs.get(key)
6330         if spec is None:
6331             raise ObjUnknown(key)
6332         if value is None:
6333             self._value.pop(key, None)
6334             return
6335         if not isinstance(value, spec.__class__):
6336             raise InvalidValueType((spec.__class__,))
6337         value = spec(value=value)
6338         if spec.default is not None and value == spec.default:
6339             self._value.pop(key, None)
6340             return
6341         self._value[key] = value
6342
6343     def __getitem__(self, key):
6344         value = self._value.get(key)
6345         if value is not None:
6346             return value
6347         spec = self.specs.get(key)
6348         if spec is None:
6349             raise ObjUnknown(key)
6350         if spec.default is not None:
6351             return spec.default
6352         return None
6353
6354     def _values_for_encoding(self):
6355         for name, spec in iteritems(self.specs):
6356             value = self._value.get(name)
6357             if value is None:
6358                 if spec.optional:
6359                     continue
6360                 raise ObjNotReady(name)
6361             yield value
6362
6363     def _encode(self):
6364         v = b"".join(v.encode() for v in self._values_for_encoding())
6365         return b"".join((self.tag, len_encode(len(v)), v))
6366
6367     def _encode2nd(self, writer, state_iter):
6368         write_full(writer, self.tag + len_encode(next(state_iter)))
6369         for v in self._values_for_encoding():
6370             v.encode2nd(writer, state_iter)
6371
6372     def _encode_cer(self, writer):
6373         write_full(writer, self.tag + LENINDEF)
6374         for v in self._values_for_encoding():
6375             v.encode_cer(writer)
6376         write_full(writer, EOC)
6377
6378     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6379         try:
6380             t, tlen, lv = tag_strip(tlv)
6381         except DecodeError as err:
6382             raise err.__class__(
6383                 msg=err.msg,
6384                 klass=self.__class__,
6385                 decode_path=decode_path,
6386                 offset=offset,
6387             )
6388         if t != self.tag:
6389             raise TagMismatch(
6390                 klass=self.__class__,
6391                 decode_path=decode_path,
6392                 offset=offset,
6393             )
6394         if tag_only:  # pragma: no cover
6395             yield None
6396             return
6397         lenindef = False
6398         ctx_bered = ctx.get("bered", False)
6399         try:
6400             l, llen, v = len_decode(lv)
6401         except LenIndefForm as err:
6402             if not ctx_bered:
6403                 raise err.__class__(
6404                     msg=err.msg,
6405                     klass=self.__class__,
6406                     decode_path=decode_path,
6407                     offset=offset,
6408                 )
6409             l, llen, v = 0, 1, lv[1:]
6410             lenindef = True
6411         except DecodeError as err:
6412             raise err.__class__(
6413                 msg=err.msg,
6414                 klass=self.__class__,
6415                 decode_path=decode_path,
6416                 offset=offset,
6417             )
6418         if l > len(v):
6419             raise NotEnoughData(
6420                 "encoded length is longer than data",
6421                 klass=self.__class__,
6422                 decode_path=decode_path,
6423                 offset=offset,
6424             )
6425         if not lenindef:
6426             v, tail = v[:l], v[l:]
6427         vlen = 0
6428         sub_offset = offset + tlen + llen
6429         values = {}
6430         ber_encoded = False
6431         ctx_allow_default_values = ctx.get("allow_default_values", False)
6432         for name, spec in iteritems(self.specs):
6433             if spec.optional and (
6434                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6435                     len(v) == 0
6436             ):
6437                 continue
6438             spec_defaulted = spec.default is not None
6439             sub_decode_path = decode_path + (name,)
6440             try:
6441                 if evgen_mode and not spec_defaulted:
6442                     for _decode_path, value, v_tail in spec.decode_evgen(
6443                             v,
6444                             sub_offset,
6445                             leavemm=True,
6446                             decode_path=sub_decode_path,
6447                             ctx=ctx,
6448                             _ctx_immutable=False,
6449                     ):
6450                         yield _decode_path, value, v_tail
6451                 else:
6452                     _, value, v_tail = next(spec.decode_evgen(
6453                         v,
6454                         sub_offset,
6455                         leavemm=True,
6456                         decode_path=sub_decode_path,
6457                         ctx=ctx,
6458                         _ctx_immutable=False,
6459                         _evgen_mode=False,
6460                     ))
6461             except TagMismatch as err:
6462                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6463                     continue
6464                 raise
6465
6466             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6467             if not evgen_mode and defined is not None:
6468                 defined_by, defined_spec = defined
6469                 if issubclass(value.__class__, SequenceOf):
6470                     for i, _value in enumerate(value):
6471                         sub_sub_decode_path = sub_decode_path + (
6472                             str(i),
6473                             DecodePathDefBy(defined_by),
6474                         )
6475                         defined_value, defined_tail = defined_spec.decode(
6476                             memoryview(bytes(_value)),
6477                             sub_offset + (
6478                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6479                                 if value.expled else (value.tlen + value.llen)
6480                             ),
6481                             leavemm=True,
6482                             decode_path=sub_sub_decode_path,
6483                             ctx=ctx,
6484                             _ctx_immutable=False,
6485                         )
6486                         if len(defined_tail) > 0:
6487                             raise DecodeError(
6488                                 "remaining data",
6489                                 klass=self.__class__,
6490                                 decode_path=sub_sub_decode_path,
6491                                 offset=offset,
6492                             )
6493                         _value.defined = (defined_by, defined_value)
6494                 else:
6495                     defined_value, defined_tail = defined_spec.decode(
6496                         memoryview(bytes(value)),
6497                         sub_offset + (
6498                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6499                             if value.expled else (value.tlen + value.llen)
6500                         ),
6501                         leavemm=True,
6502                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6503                         ctx=ctx,
6504                         _ctx_immutable=False,
6505                     )
6506                     if len(defined_tail) > 0:
6507                         raise DecodeError(
6508                             "remaining data",
6509                             klass=self.__class__,
6510                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6511                             offset=offset,
6512                         )
6513                     value.defined = (defined_by, defined_value)
6514
6515             value_len = value.fulllen
6516             vlen += value_len
6517             sub_offset += value_len
6518             v = v_tail
6519             if spec_defaulted:
6520                 if evgen_mode:
6521                     yield sub_decode_path, value, v_tail
6522                 if value == spec.default:
6523                     if ctx_bered or ctx_allow_default_values:
6524                         ber_encoded = True
6525                     else:
6526                         raise DecodeError(
6527                             "DEFAULT value met",
6528                             klass=self.__class__,
6529                             decode_path=sub_decode_path,
6530                             offset=sub_offset,
6531                         )
6532             if not evgen_mode:
6533                 values[name] = value
6534                 spec_defines = getattr(spec, "defines", ())
6535                 if len(spec_defines) == 0:
6536                     defines_by_path = ctx.get("defines_by_path", ())
6537                     if len(defines_by_path) > 0:
6538                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6539                 if spec_defines is not None and len(spec_defines) > 0:
6540                     for rel_path, schema in spec_defines:
6541                         defined = schema.get(value, None)
6542                         if defined is not None:
6543                             ctx.setdefault("_defines", []).append((
6544                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6545                                 (value, defined),
6546                             ))
6547         if lenindef:
6548             if v[:EOC_LEN].tobytes() != EOC:
6549                 raise DecodeError(
6550                     "no EOC",
6551                     klass=self.__class__,
6552                     decode_path=decode_path,
6553                     offset=offset,
6554                 )
6555             tail = v[EOC_LEN:]
6556             vlen += EOC_LEN
6557         elif len(v) > 0:
6558             raise DecodeError(
6559                 "remaining data",
6560                 klass=self.__class__,
6561                 decode_path=decode_path,
6562                 offset=offset,
6563             )
6564         obj = self.__class__(
6565             schema=self.specs,
6566             impl=self.tag,
6567             expl=self._expl,
6568             default=self.default,
6569             optional=self.optional,
6570             _decoded=(offset, llen, vlen),
6571         )
6572         obj._value = values
6573         obj.lenindef = lenindef
6574         obj.ber_encoded = ber_encoded
6575         yield decode_path, obj, tail
6576
6577     def __repr__(self):
6578         value = pp_console_row(next(self.pps()))
6579         cols = []
6580         for name in self.specs:
6581             _value = self._value.get(name)
6582             if _value is None:
6583                 continue
6584             cols.append("%s: %s" % (name, repr(_value)))
6585         return "%s[%s]" % (value, "; ".join(cols))
6586
6587     def pps(self, decode_path=()):
6588         yield _pp(
6589             obj=self,
6590             asn1_type_name=self.asn1_type_name,
6591             obj_name=self.__class__.__name__,
6592             decode_path=decode_path,
6593             optional=self.optional,
6594             default=self == self.default,
6595             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6596             expl=None if self._expl is None else tag_decode(self._expl),
6597             offset=self.offset,
6598             tlen=self.tlen,
6599             llen=self.llen,
6600             vlen=self.vlen,
6601             expl_offset=self.expl_offset if self.expled else None,
6602             expl_tlen=self.expl_tlen if self.expled else None,
6603             expl_llen=self.expl_llen if self.expled else None,
6604             expl_vlen=self.expl_vlen if self.expled else None,
6605             expl_lenindef=self.expl_lenindef,
6606             lenindef=self.lenindef,
6607             ber_encoded=self.ber_encoded,
6608             bered=self.bered,
6609         )
6610         for name in self.specs:
6611             value = self._value.get(name)
6612             if value is None:
6613                 continue
6614             yield value.pps(decode_path=decode_path + (name,))
6615         for pp in self.pps_lenindef(decode_path):
6616             yield pp
6617
6618
6619 class Set(Sequence, SequenceEncode1stMixing):
6620     """``SET`` structure type
6621
6622     Its usage is identical to :py:class:`pyderasn.Sequence`.
6623
6624     .. _allow_unordered_set_ctx:
6625
6626     DER prohibits unordered values encoding and will raise an error
6627     during decode. If :ref:`bered <bered_ctx>` context option is set,
6628     then no error will occur. Also you can disable strict values
6629     ordering check by setting ``"allow_unordered_set": True``
6630     :ref:`context <ctx>` option.
6631     """
6632     __slots__ = ()
6633     tag_default = tag_encode(form=TagFormConstructed, num=17)
6634     asn1_type_name = "SET"
6635
6636     def _values_for_encoding(self):
6637         return sorted(
6638             super(Set, self)._values_for_encoding(),
6639             key=attrgetter("tag_order"),
6640         )
6641
6642     def _encode_cer(self, writer):
6643         write_full(writer, self.tag + LENINDEF)
6644         for v in sorted(
6645                 super(Set, self)._values_for_encoding(),
6646                 key=attrgetter("tag_order_cer"),
6647         ):
6648             v.encode_cer(writer)
6649         write_full(writer, EOC)
6650
6651     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6652         try:
6653             t, tlen, lv = tag_strip(tlv)
6654         except DecodeError as err:
6655             raise err.__class__(
6656                 msg=err.msg,
6657                 klass=self.__class__,
6658                 decode_path=decode_path,
6659                 offset=offset,
6660             )
6661         if t != self.tag:
6662             raise TagMismatch(
6663                 klass=self.__class__,
6664                 decode_path=decode_path,
6665                 offset=offset,
6666             )
6667         if tag_only:
6668             yield None
6669             return
6670         lenindef = False
6671         ctx_bered = ctx.get("bered", False)
6672         try:
6673             l, llen, v = len_decode(lv)
6674         except LenIndefForm as err:
6675             if not ctx_bered:
6676                 raise err.__class__(
6677                     msg=err.msg,
6678                     klass=self.__class__,
6679                     decode_path=decode_path,
6680                     offset=offset,
6681                 )
6682             l, llen, v = 0, 1, lv[1:]
6683             lenindef = True
6684         except DecodeError as err:
6685             raise err.__class__(
6686                 msg=err.msg,
6687                 klass=self.__class__,
6688                 decode_path=decode_path,
6689                 offset=offset,
6690             )
6691         if l > len(v):
6692             raise NotEnoughData(
6693                 "encoded length is longer than data",
6694                 klass=self.__class__,
6695                 offset=offset,
6696             )
6697         if not lenindef:
6698             v, tail = v[:l], v[l:]
6699         vlen = 0
6700         sub_offset = offset + tlen + llen
6701         values = {}
6702         ber_encoded = False
6703         ctx_allow_default_values = ctx.get("allow_default_values", False)
6704         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6705         tag_order_prev = (0, 0)
6706         _specs_items = copy(self.specs)
6707
6708         while len(v) > 0:
6709             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6710                 break
6711             for name, spec in iteritems(_specs_items):
6712                 sub_decode_path = decode_path + (name,)
6713                 try:
6714                     spec.decode(
6715                         v,
6716                         sub_offset,
6717                         leavemm=True,
6718                         decode_path=sub_decode_path,
6719                         ctx=ctx,
6720                         tag_only=True,
6721                         _ctx_immutable=False,
6722                     )
6723                 except TagMismatch:
6724                     continue
6725                 break
6726             else:
6727                 raise TagMismatch(
6728                     klass=self.__class__,
6729                     decode_path=decode_path,
6730                     offset=offset,
6731                 )
6732             spec_defaulted = spec.default is not None
6733             if evgen_mode and not spec_defaulted:
6734                 for _decode_path, value, v_tail in spec.decode_evgen(
6735                         v,
6736                         sub_offset,
6737                         leavemm=True,
6738                         decode_path=sub_decode_path,
6739                         ctx=ctx,
6740                         _ctx_immutable=False,
6741                 ):
6742                     yield _decode_path, value, v_tail
6743             else:
6744                 _, value, v_tail = next(spec.decode_evgen(
6745                     v,
6746                     sub_offset,
6747                     leavemm=True,
6748                     decode_path=sub_decode_path,
6749                     ctx=ctx,
6750                     _ctx_immutable=False,
6751                     _evgen_mode=False,
6752                 ))
6753             value_tag_order = value.tag_order
6754             value_len = value.fulllen
6755             if tag_order_prev >= value_tag_order:
6756                 if ctx_bered or ctx_allow_unordered_set:
6757                     ber_encoded = True
6758                 else:
6759                     raise DecodeError(
6760                         "unordered " + self.asn1_type_name,
6761                         klass=self.__class__,
6762                         decode_path=sub_decode_path,
6763                         offset=sub_offset,
6764                     )
6765             if spec_defaulted:
6766                 if evgen_mode:
6767                     yield sub_decode_path, value, v_tail
6768                 if value != spec.default:
6769                     pass
6770                 elif ctx_bered or ctx_allow_default_values:
6771                     ber_encoded = True
6772                 else:
6773                     raise DecodeError(
6774                         "DEFAULT value met",
6775                         klass=self.__class__,
6776                         decode_path=sub_decode_path,
6777                         offset=sub_offset,
6778                     )
6779             values[name] = value
6780             del _specs_items[name]
6781             tag_order_prev = value_tag_order
6782             sub_offset += value_len
6783             vlen += value_len
6784             v = v_tail
6785
6786         obj = self.__class__(
6787             schema=self.specs,
6788             impl=self.tag,
6789             expl=self._expl,
6790             default=self.default,
6791             optional=self.optional,
6792             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6793         )
6794         if lenindef:
6795             if v[:EOC_LEN].tobytes() != EOC:
6796                 raise DecodeError(
6797                     "no EOC",
6798                     klass=self.__class__,
6799                     decode_path=decode_path,
6800                     offset=offset,
6801                 )
6802             tail = v[EOC_LEN:]
6803             obj.lenindef = True
6804         for name, spec in iteritems(self.specs):
6805             if name not in values and not spec.optional:
6806                 raise DecodeError(
6807                     "%s value is not ready" % name,
6808                     klass=self.__class__,
6809                     decode_path=decode_path,
6810                     offset=offset,
6811                 )
6812         if not evgen_mode:
6813             obj._value = values
6814         obj.ber_encoded = ber_encoded
6815         yield decode_path, obj, tail
6816
6817
6818 SequenceOfState = namedtuple(
6819     "SequenceOfState",
6820     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6821     **NAMEDTUPLE_KWARGS
6822 )
6823
6824
6825 class SequenceOf(SequenceEncode1stMixing, Obj):
6826     """``SEQUENCE OF`` sequence type
6827
6828     For that kind of type you must specify the object it will carry on
6829     (bounds are for example here, not required)::
6830
6831         class Ints(SequenceOf):
6832             schema = Integer()
6833             bounds = (0, 2)
6834
6835     >>> ints = Ints()
6836     >>> ints.append(Integer(123))
6837     >>> ints.append(Integer(234))
6838     >>> ints
6839     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6840     >>> [int(i) for i in ints]
6841     [123, 234]
6842     >>> ints.append(Integer(345))
6843     Traceback (most recent call last):
6844     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6845     >>> ints[1]
6846     INTEGER 234
6847     >>> ints[1] = Integer(345)
6848     >>> ints
6849     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6850
6851     You can initialize sequence with preinitialized values:
6852
6853     >>> ints = Ints([Integer(123), Integer(234)])
6854
6855     Also you can use iterator as a value:
6856
6857     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6858
6859     And it won't be iterated until encoding process. Pay attention that
6860     bounds and required schema checks are done only during the encoding
6861     process in that case! After encode was called, then value is zeroed
6862     back to empty list and you have to set it again. That mode is useful
6863     mainly with CER encoding mode, where all objects from the iterable
6864     will be streamed to the buffer, without copying all of them to
6865     memory first.
6866     """
6867     __slots__ = ("spec", "_bound_min", "_bound_max")
6868     tag_default = tag_encode(form=TagFormConstructed, num=16)
6869     asn1_type_name = "SEQUENCE OF"
6870
6871     def __init__(
6872             self,
6873             value=None,
6874             schema=None,
6875             bounds=None,
6876             impl=None,
6877             expl=None,
6878             default=None,
6879             optional=False,
6880             _decoded=(0, 0, 0),
6881     ):
6882         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6883         if schema is None:
6884             schema = getattr(self, "schema", None)
6885         if schema is None:
6886             raise ValueError("schema must be specified")
6887         self.spec = schema
6888         self._bound_min, self._bound_max = getattr(
6889             self,
6890             "bounds",
6891             (0, float("+inf")),
6892         ) if bounds is None else bounds
6893         self._value = []
6894         if value is not None:
6895             self._value = self._value_sanitize(value)
6896         if default is not None:
6897             default_value = self._value_sanitize(default)
6898             default_obj = self.__class__(
6899                 schema=schema,
6900                 impl=self.tag,
6901                 expl=self._expl,
6902             )
6903             default_obj._value = default_value
6904             self.default = default_obj
6905             if value is None:
6906                 self._value = copy(default_obj._value)
6907
6908     def _value_sanitize(self, value):
6909         iterator = False
6910         if issubclass(value.__class__, SequenceOf):
6911             value = value._value
6912         elif hasattr(value, NEXT_ATTR_NAME):
6913             iterator = True
6914         elif hasattr(value, "__iter__"):
6915             value = list(value)
6916         else:
6917             raise InvalidValueType((self.__class__, iter, "iterator"))
6918         if not iterator:
6919             if not self._bound_min <= len(value) <= self._bound_max:
6920                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6921             class_expected = self.spec.__class__
6922             for v in value:
6923                 if not isinstance(v, class_expected):
6924                     raise InvalidValueType((class_expected,))
6925         return value
6926
6927     @property
6928     def ready(self):
6929         if hasattr(self._value, NEXT_ATTR_NAME):
6930             return True
6931         if self._bound_min > 0 and len(self._value) == 0:
6932             return False
6933         return all(v.ready for v in self._value)
6934
6935     @property
6936     def bered(self):
6937         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6938             return True
6939         return any(v.bered for v in self._value)
6940
6941     def __getstate__(self):
6942         if hasattr(self._value, NEXT_ATTR_NAME):
6943             raise ValueError("can not pickle SequenceOf with iterator")
6944         return SequenceOfState(
6945             __version__,
6946             self.tag,
6947             self._tag_order,
6948             self._expl,
6949             self.default,
6950             self.optional,
6951             self.offset,
6952             self.llen,
6953             self.vlen,
6954             self.expl_lenindef,
6955             self.lenindef,
6956             self.ber_encoded,
6957             self.spec,
6958             [copy(v) for v in self._value],
6959             self._bound_min,
6960             self._bound_max,
6961         )
6962
6963     def __setstate__(self, state):
6964         super(SequenceOf, self).__setstate__(state)
6965         self.spec = state.spec
6966         self._value = state.value
6967         self._bound_min = state.bound_min
6968         self._bound_max = state.bound_max
6969
6970     def __eq__(self, their):
6971         if isinstance(their, self.__class__):
6972             return (
6973                 self.spec == their.spec and
6974                 self.tag == their.tag and
6975                 self._expl == their._expl and
6976                 self._value == their._value
6977             )
6978         if hasattr(their, "__iter__"):
6979             return self._value == list(their)
6980         return False
6981
6982     def __call__(
6983             self,
6984             value=None,
6985             bounds=None,
6986             impl=None,
6987             expl=None,
6988             default=None,
6989             optional=None,
6990     ):
6991         return self.__class__(
6992             value=value,
6993             schema=self.spec,
6994             bounds=(
6995                 (self._bound_min, self._bound_max)
6996                 if bounds is None else bounds
6997             ),
6998             impl=self.tag if impl is None else impl,
6999             expl=self._expl if expl is None else expl,
7000             default=self.default if default is None else default,
7001             optional=self.optional if optional is None else optional,
7002         )
7003
7004     def __contains__(self, key):
7005         return key in self._value
7006
7007     def append(self, value):
7008         if not isinstance(value, self.spec.__class__):
7009             raise InvalidValueType((self.spec.__class__,))
7010         if len(self._value) + 1 > self._bound_max:
7011             raise BoundsError(
7012                 self._bound_min,
7013                 len(self._value) + 1,
7014                 self._bound_max,
7015             )
7016         self._value.append(value)
7017
7018     def __iter__(self):
7019         return iter(self._value)
7020
7021     def __len__(self):
7022         return len(self._value)
7023
7024     def __setitem__(self, key, value):
7025         if not isinstance(value, self.spec.__class__):
7026             raise InvalidValueType((self.spec.__class__,))
7027         self._value[key] = self.spec(value=value)
7028
7029     def __getitem__(self, key):
7030         return self._value[key]
7031
7032     def _values_for_encoding(self):
7033         return iter(self._value)
7034
7035     def _encode(self):
7036         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7037         if iterator:
7038             values = []
7039             values_append = values.append
7040             class_expected = self.spec.__class__
7041             values_for_encoding = self._values_for_encoding()
7042             self._value = []
7043             for v in values_for_encoding:
7044                 if not isinstance(v, class_expected):
7045                     raise InvalidValueType((class_expected,))
7046                 values_append(v.encode())
7047             if not self._bound_min <= len(values) <= self._bound_max:
7048                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7049             value = b"".join(values)
7050         else:
7051             value = b"".join(v.encode() for v in self._values_for_encoding())
7052         return b"".join((self.tag, len_encode(len(value)), value))
7053
7054     def _encode1st(self, state):
7055         state = super(SequenceOf, self)._encode1st(state)
7056         if hasattr(self._value, NEXT_ATTR_NAME):
7057             self._value = []
7058         return state
7059
7060     def _encode2nd(self, writer, state_iter):
7061         write_full(writer, self.tag + len_encode(next(state_iter)))
7062         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7063         if iterator:
7064             values_count = 0
7065             class_expected = self.spec.__class__
7066             values_for_encoding = self._values_for_encoding()
7067             self._value = []
7068             for v in values_for_encoding:
7069                 if not isinstance(v, class_expected):
7070                     raise InvalidValueType((class_expected,))
7071                 v.encode2nd(writer, state_iter)
7072                 values_count += 1
7073             if not self._bound_min <= values_count <= self._bound_max:
7074                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7075         else:
7076             for v in self._values_for_encoding():
7077                 v.encode2nd(writer, state_iter)
7078
7079     def _encode_cer(self, writer):
7080         write_full(writer, self.tag + LENINDEF)
7081         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7082         if iterator:
7083             class_expected = self.spec.__class__
7084             values_count = 0
7085             values_for_encoding = self._values_for_encoding()
7086             self._value = []
7087             for v in values_for_encoding:
7088                 if not isinstance(v, class_expected):
7089                     raise InvalidValueType((class_expected,))
7090                 v.encode_cer(writer)
7091                 values_count += 1
7092             if not self._bound_min <= values_count <= self._bound_max:
7093                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7094         else:
7095             for v in self._values_for_encoding():
7096                 v.encode_cer(writer)
7097         write_full(writer, EOC)
7098
7099     def _decode(
7100             self,
7101             tlv,
7102             offset,
7103             decode_path,
7104             ctx,
7105             tag_only,
7106             evgen_mode,
7107             ordering_check=False,
7108     ):
7109         try:
7110             t, tlen, lv = tag_strip(tlv)
7111         except DecodeError as err:
7112             raise err.__class__(
7113                 msg=err.msg,
7114                 klass=self.__class__,
7115                 decode_path=decode_path,
7116                 offset=offset,
7117             )
7118         if t != self.tag:
7119             raise TagMismatch(
7120                 klass=self.__class__,
7121                 decode_path=decode_path,
7122                 offset=offset,
7123             )
7124         if tag_only:
7125             yield None
7126             return
7127         lenindef = False
7128         ctx_bered = ctx.get("bered", False)
7129         try:
7130             l, llen, v = len_decode(lv)
7131         except LenIndefForm as err:
7132             if not ctx_bered:
7133                 raise err.__class__(
7134                     msg=err.msg,
7135                     klass=self.__class__,
7136                     decode_path=decode_path,
7137                     offset=offset,
7138                 )
7139             l, llen, v = 0, 1, lv[1:]
7140             lenindef = True
7141         except DecodeError as err:
7142             raise err.__class__(
7143                 msg=err.msg,
7144                 klass=self.__class__,
7145                 decode_path=decode_path,
7146                 offset=offset,
7147             )
7148         if l > len(v):
7149             raise NotEnoughData(
7150                 "encoded length is longer than data",
7151                 klass=self.__class__,
7152                 decode_path=decode_path,
7153                 offset=offset,
7154             )
7155         if not lenindef:
7156             v, tail = v[:l], v[l:]
7157         vlen = 0
7158         sub_offset = offset + tlen + llen
7159         _value = []
7160         _value_count = 0
7161         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7162         value_prev = memoryview(v[:0])
7163         ber_encoded = False
7164         spec = self.spec
7165         while len(v) > 0:
7166             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7167                 break
7168             sub_decode_path = decode_path + (str(_value_count),)
7169             if evgen_mode:
7170                 for _decode_path, value, v_tail in spec.decode_evgen(
7171                         v,
7172                         sub_offset,
7173                         leavemm=True,
7174                         decode_path=sub_decode_path,
7175                         ctx=ctx,
7176                         _ctx_immutable=False,
7177                 ):
7178                     yield _decode_path, value, v_tail
7179             else:
7180                 _, value, v_tail = next(spec.decode_evgen(
7181                     v,
7182                     sub_offset,
7183                     leavemm=True,
7184                     decode_path=sub_decode_path,
7185                     ctx=ctx,
7186                     _ctx_immutable=False,
7187                     _evgen_mode=False,
7188                 ))
7189             value_len = value.fulllen
7190             if ordering_check:
7191                 if value_prev.tobytes() > v[:value_len].tobytes():
7192                     if ctx_bered or ctx_allow_unordered_set:
7193                         ber_encoded = True
7194                     else:
7195                         raise DecodeError(
7196                             "unordered " + self.asn1_type_name,
7197                             klass=self.__class__,
7198                             decode_path=sub_decode_path,
7199                             offset=sub_offset,
7200                         )
7201                 value_prev = v[:value_len]
7202             _value_count += 1
7203             if not evgen_mode:
7204                 _value.append(value)
7205             sub_offset += value_len
7206             vlen += value_len
7207             v = v_tail
7208         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7209             raise DecodeError(
7210                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7211                 klass=self.__class__,
7212                 decode_path=decode_path,
7213                 offset=offset,
7214             )
7215         try:
7216             obj = self.__class__(
7217                 value=None if evgen_mode else _value,
7218                 schema=spec,
7219                 bounds=(self._bound_min, self._bound_max),
7220                 impl=self.tag,
7221                 expl=self._expl,
7222                 default=self.default,
7223                 optional=self.optional,
7224                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7225             )
7226         except BoundsError as err:
7227             raise DecodeError(
7228                 msg=str(err),
7229                 klass=self.__class__,
7230                 decode_path=decode_path,
7231                 offset=offset,
7232             )
7233         if lenindef:
7234             if v[:EOC_LEN].tobytes() != EOC:
7235                 raise DecodeError(
7236                     "no EOC",
7237                     klass=self.__class__,
7238                     decode_path=decode_path,
7239                     offset=offset,
7240                 )
7241             obj.lenindef = True
7242             tail = v[EOC_LEN:]
7243         obj.ber_encoded = ber_encoded
7244         yield decode_path, obj, tail
7245
7246     def __repr__(self):
7247         return "%s[%s]" % (
7248             pp_console_row(next(self.pps())),
7249             ", ".join(repr(v) for v in self._value),
7250         )
7251
7252     def pps(self, decode_path=()):
7253         yield _pp(
7254             obj=self,
7255             asn1_type_name=self.asn1_type_name,
7256             obj_name=self.__class__.__name__,
7257             decode_path=decode_path,
7258             optional=self.optional,
7259             default=self == self.default,
7260             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7261             expl=None if self._expl is None else tag_decode(self._expl),
7262             offset=self.offset,
7263             tlen=self.tlen,
7264             llen=self.llen,
7265             vlen=self.vlen,
7266             expl_offset=self.expl_offset if self.expled else None,
7267             expl_tlen=self.expl_tlen if self.expled else None,
7268             expl_llen=self.expl_llen if self.expled else None,
7269             expl_vlen=self.expl_vlen if self.expled else None,
7270             expl_lenindef=self.expl_lenindef,
7271             lenindef=self.lenindef,
7272             ber_encoded=self.ber_encoded,
7273             bered=self.bered,
7274         )
7275         for i, value in enumerate(self._value):
7276             yield value.pps(decode_path=decode_path + (str(i),))
7277         for pp in self.pps_lenindef(decode_path):
7278             yield pp
7279
7280
7281 class SetOf(SequenceOf):
7282     """``SET OF`` sequence type
7283
7284     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7285     """
7286     __slots__ = ()
7287     tag_default = tag_encode(form=TagFormConstructed, num=17)
7288     asn1_type_name = "SET OF"
7289
7290     def _value_sanitize(self, value):
7291         value = super(SetOf, self)._value_sanitize(value)
7292         if hasattr(value, NEXT_ATTR_NAME):
7293             raise ValueError(
7294                 "SetOf does not support iterator values, as no sense in them"
7295             )
7296         return value
7297
7298     def _encode(self):
7299         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7300         return b"".join((self.tag, len_encode(len(v)), v))
7301
7302     def _encode2nd(self, writer, state_iter):
7303         write_full(writer, self.tag + len_encode(next(state_iter)))
7304         values = []
7305         for v in self._values_for_encoding():
7306             buf = BytesIO()
7307             v.encode2nd(buf.write, state_iter)
7308             values.append(buf.getvalue())
7309         values.sort()
7310         for v in values:
7311             write_full(writer, v)
7312
7313     def _encode_cer(self, writer):
7314         write_full(writer, self.tag + LENINDEF)
7315         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7316             write_full(writer, v)
7317         write_full(writer, EOC)
7318
7319     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7320         return super(SetOf, self)._decode(
7321             tlv,
7322             offset,
7323             decode_path,
7324             ctx,
7325             tag_only,
7326             evgen_mode,
7327             ordering_check=True,
7328         )
7329
7330
7331 def obj_by_path(pypath):  # pragma: no cover
7332     """Import object specified as string Python path
7333
7334     Modules must be separated from classes/functions with ``:``.
7335
7336     >>> obj_by_path("foo.bar:Baz")
7337     <class 'foo.bar.Baz'>
7338     >>> obj_by_path("foo.bar:Baz.boo")
7339     <classmethod 'foo.bar.Baz.boo'>
7340     """
7341     mod, objs = pypath.rsplit(":", 1)
7342     from importlib import import_module
7343     obj = import_module(mod)
7344     for obj_name in objs.split("."):
7345         obj = getattr(obj, obj_name)
7346     return obj
7347
7348
7349 def generic_decoder():  # pragma: no cover
7350     # All of this below is a big hack with self references
7351     choice = PrimitiveTypes()
7352     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7353     choice.specs["SetOf"] = SetOf(schema=choice)
7354     for i in six_xrange(31):
7355         choice.specs["SequenceOf%d" % i] = SequenceOf(
7356             schema=choice,
7357             expl=tag_ctxc(i),
7358         )
7359     choice.specs["Any"] = Any()
7360
7361     # Class name equals to type name, to omit it from output
7362     class SEQUENCEOF(SequenceOf):
7363         __slots__ = ()
7364         schema = choice
7365
7366     def pprint_any(
7367             obj,
7368             oid_maps=(),
7369             with_colours=False,
7370             with_decode_path=False,
7371             decode_path_only=(),
7372             decode_path=(),
7373     ):
7374         def _pprint_pps(pps):
7375             for pp in pps:
7376                 if hasattr(pp, "_fields"):
7377                     if (
7378                             decode_path_only != () and
7379                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7380                     ):
7381                         continue
7382                     if pp.asn1_type_name == Choice.asn1_type_name:
7383                         continue
7384                     pp_kwargs = pp._asdict()
7385                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7386                     pp = _pp(**pp_kwargs)
7387                     yield pp_console_row(
7388                         pp,
7389                         oid_maps=oid_maps,
7390                         with_offsets=True,
7391                         with_blob=False,
7392                         with_colours=with_colours,
7393                         with_decode_path=with_decode_path,
7394                         decode_path_len_decrease=len(decode_path_only),
7395                     )
7396                     for row in pp_console_blob(
7397                             pp,
7398                             decode_path_len_decrease=len(decode_path_only),
7399                     ):
7400                         yield row
7401                 else:
7402                     for row in _pprint_pps(pp):
7403                         yield row
7404         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7405     return SEQUENCEOF(), pprint_any
7406
7407
7408 def ascii_visualize(ba):
7409     """Output only ASCII printable characters, like in hexdump -C
7410
7411     Example output for given binary string (right part)::
7412
7413         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7414                                                            ^^^^^^^^^^^^^^^^
7415     """
7416     return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7417
7418
7419 def hexdump(raw):
7420     """Generate ``hexdump -C`` like output
7421
7422     Rendered example::
7423
7424         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7425         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7426
7427     Result of that function is a generator of lines, where each line is
7428     a list of columns::
7429
7430         [
7431             [...],
7432             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7433                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7434                           " |i..?.....G...k.j|"]
7435             [...],
7436         ]
7437     """
7438     hexed = hexenc(raw).upper()
7439     addr, cols = 0, ["%08x " % 0]
7440     for i in six_xrange(0, len(hexed), 2):
7441         if i != 0 and i // 2 % 8 == 0:
7442             cols[-1] += " "
7443         if i != 0 and i // 2 % 16 == 0:
7444             cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7445             yield cols
7446             addr += 16
7447             cols = ["%08x " % addr]
7448         cols.append(" " + hexed[i:i + 2])
7449     if len(cols) > 0:
7450         cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7451         yield cols
7452
7453
7454 def browse(raw, obj, oid_maps=()):
7455     """Interactive browser
7456
7457     :param bytes raw: binary data you decoded
7458     :param obj: decoded :py:class:`pyderasn.Obj`
7459     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7460                      Its human readable form is printed when OID is met
7461
7462     .. note:: `urwid <http://urwid.org/>`__ dependency required
7463
7464     This browser is an interactive terminal application for browsing
7465     structures of your decoded ASN.1 objects. You can quit it with **q**
7466     key. It consists of three windows:
7467
7468     :tree:
7469      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7470      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7471      **Left** key goes to constructed element above. **Plus**/**Minus**
7472      keys collapse/uncollapse constructed elements. **Space** toggles it
7473     :info:
7474      window with various information about element. You can scroll it
7475      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7476     :hexdump:
7477      window with raw data hexdump and highlighted current element's
7478      contents. It automatically focuses on element's data. You can
7479      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7480      speed) keys. If element has explicit tag, then it also will be
7481      highlighted with different colour
7482
7483     Window's header contains current decode path and progress bars with
7484     position in *info* and *hexdump* windows.
7485
7486     If you press **d**, then current element will be saved in the
7487     current directory under its decode path name (adding ".0", ".1", etc
7488     suffix if such file already exists). **D** will save it with explicit tag.
7489
7490     You can also invoke it with ``--browse`` command line argument.
7491     """
7492     from copy import deepcopy
7493     from os.path import exists as path_exists
7494     import urwid
7495
7496     class TW(urwid.TreeWidget):
7497         def __init__(self, state, *args, **kwargs):
7498             self.state = state
7499             self.scrolled = {"info": False, "hexdump": False}
7500             super(TW, self).__init__(*args, **kwargs)
7501
7502         def _get_pp(self):
7503             pp = self.get_node().get_value()
7504             constructed = len(pp) > 1
7505             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7506
7507         def _state_update(self):
7508             pp, _ = self._get_pp()
7509             self.state["decode_path"].set_text(
7510                 ":".join(str(p) for p in pp.decode_path)
7511             )
7512             lines = deepcopy(self.state["hexed"])
7513
7514             def attr_set(i, attr):
7515                 line = lines[i // 16]
7516                 idx = 1 + (i - 16 * (i // 16))
7517                 line[idx] = (attr, line[idx])
7518
7519             if pp.expl_offset is not None:
7520                 for i in six_xrange(
7521                         pp.expl_offset,
7522                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7523                 ):
7524                     attr_set(i, "select-expl")
7525             for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7526                 attr_set(i, "select-value")
7527             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7528             self.state["hexdump"].set_focus(pp.offset // 16)
7529             self.state["hexdump"].set_focus_valign("middle")
7530             self.state["hexdump_bar"].set_completion(
7531                 (100 * pp.offset // 16) //
7532                 len(self.state["hexdump"]._body.positions())
7533             )
7534
7535             lines = [
7536                 [("header", "Name: "), pp.obj_name],
7537                 [("header", "Type: "), pp.asn1_type_name],
7538                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7539                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7540                     pp.tlen, pp.llen, pp.vlen,
7541                 )],
7542                 [("header", "Slice: "), "[%d:%d]" % (
7543                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7544                 )],
7545             ]
7546             if pp.lenindef:
7547                 lines.append([("warning", "LENINDEF")])
7548             if pp.ber_encoded:
7549                 lines.append([("warning", "BER encoded")])
7550             if pp.bered:
7551                 lines.append([("warning", "BERed")])
7552             if pp.expl is not None:
7553                 lines.append([("header", "EXPLICIT")])
7554                 klass, _, num = pp.expl
7555                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7556                 if pp.expl_offset is not None:
7557                     lines.append(["  Offset: %d" % pp.expl_offset])
7558                     lines.append(["  [TLV]len: %d/%d/%d" % (
7559                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7560                     )])
7561                     lines.append(["  Slice: [%d:%d]" % (
7562                         pp.expl_offset,
7563                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7564                     )])
7565             if pp.impl is not None:
7566                 klass, _, num = pp.impl
7567                 lines.append([
7568                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7569                 ])
7570             if pp.optional:
7571                 lines.append(["OPTIONAL"])
7572             if pp.default:
7573                 lines.append(["DEFAULT"])
7574             if len(pp.decode_path) > 0:
7575                 ent = pp.decode_path[-1]
7576                 if isinstance(ent, DecodePathDefBy):
7577                     lines.append([""])
7578                     value = str(ent.defined_by)
7579                     oid_name = find_oid_name(
7580                         ent.defined_by.asn1_type_name, oid_maps, value,
7581                     )
7582                     lines.append([("header", "DEFINED BY: "), "%s" % (
7583                         value if oid_name is None
7584                         else "%s (%s)" % (oid_name, value)
7585                     )])
7586             lines.append([""])
7587             if pp.value is not None:
7588                 lines.append([("header", "Value: "), pp.value])
7589                 if (
7590                         len(oid_maps) > 0 and
7591                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7592                 ):
7593                     for oid_map in oid_maps:
7594                         oid_name = oid_map.get(pp.value)
7595                         if oid_name is not None:
7596                             lines.append([("header", "Human: "), oid_name])
7597                             break
7598                 if pp.asn1_type_name == Integer.asn1_type_name:
7599                     lines.append([
7600                         ("header", "Decimal: "), "%d" % int(pp.obj),
7601                     ])
7602                     lines.append([
7603                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7604                     ])
7605             if pp.blob.__class__ == binary_type:
7606                 blob = hexenc(pp.blob).upper()
7607                 for i in six_xrange(0, len(blob), 32):
7608                     lines.append([colonize_hex(blob[i:i + 32])])
7609             elif pp.blob.__class__ == tuple:
7610                 lines.append([", ".join(pp.blob)])
7611             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7612             self.state["info_bar"].set_completion(0)
7613
7614         def selectable(self):
7615             if self.state["widget_current"] != self:
7616                 self.state["widget_current"] = self
7617                 self.scrolled["info"] = False
7618                 self.scrolled["hexdump"] = False
7619                 self._state_update()
7620             return super(TW, self).selectable()
7621
7622         def get_display_text(self):
7623             pp, constructed = self._get_pp()
7624             style = "constructed" if constructed else ""
7625             if len(pp.decode_path) == 0:
7626                 return (style, pp.obj_name)
7627             if pp.asn1_type_name == "EOC":
7628                 return ("eoc", "EOC")
7629             ent = pp.decode_path[-1]
7630             if isinstance(ent, DecodePathDefBy):
7631                 value = str(ent.defined_by)
7632                 oid_name = find_oid_name(
7633                     ent.defined_by.asn1_type_name, oid_maps, value,
7634                 )
7635                 return ("defby", "DEFBY:" + (
7636                     value if oid_name is None else oid_name
7637                 ))
7638             return (style, ent)
7639
7640         def _scroll(self, what, step):
7641             self.state[what]._invalidate()
7642             pos = self.state[what].focus_position
7643             if not self.scrolled[what]:
7644                 self.scrolled[what] = True
7645                 pos -= 2
7646             pos = max(0, pos + step)
7647             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7648             self.state[what].set_focus(pos)
7649             self.state[what].set_focus_valign("top")
7650             self.state[what + "_bar"].set_completion(
7651                 (100 * pos) // len(self.state[what]._body.positions())
7652             )
7653
7654         def keypress(self, size, key):
7655             if key == "q":
7656                 raise urwid.ExitMainLoop()
7657
7658             if key == " ":
7659                 self.expanded = not self.expanded
7660                 self.update_expanded_icon()
7661                 return None
7662
7663             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7664             if key in hexdump_steps:
7665                 self._scroll("hexdump", hexdump_steps[key])
7666                 return None
7667
7668             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7669             if key in info_steps:
7670                 self._scroll("info", info_steps[key])
7671                 return None
7672
7673             if key in ("d", "D"):
7674                 pp, _ = self._get_pp()
7675                 dp = ":".join(str(p) for p in pp.decode_path)
7676                 dp = dp.replace(" ", "_")
7677                 if dp == "":
7678                     dp = "root"
7679                 if key == "d" or pp.expl_offset is None:
7680                     data = self.state["raw"][pp.offset:(
7681                         pp.offset + pp.tlen + pp.llen + pp.vlen
7682                     )]
7683                 else:
7684                     data = self.state["raw"][pp.expl_offset:(
7685                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7686                     )]
7687                 ctr = 0
7688
7689                 def duplicate_path(dp, ctr):
7690                     if ctr == 0:
7691                         return dp
7692                     return "%s.%d" % (dp, ctr)
7693
7694                 while True:
7695                     if not path_exists(duplicate_path(dp, ctr)):
7696                         break
7697                     ctr += 1
7698                 dp = duplicate_path(dp, ctr)
7699                 with open(dp, "wb") as fd:
7700                     fd.write(data)
7701                 self.state["decode_path"].set_text(
7702                     ("warning", "Saved to: " + dp)
7703                 )
7704                 return None
7705             return super(TW, self).keypress(size, key)
7706
7707     class PN(urwid.ParentNode):
7708         def __init__(self, state, value, *args, **kwargs):
7709             self.state = state
7710             if not hasattr(value, "_fields"):
7711                 value = list(value)
7712             super(PN, self).__init__(value, *args, **kwargs)
7713
7714         def load_widget(self):
7715             return TW(self.state, self)
7716
7717         def load_child_keys(self):
7718             value = self.get_value()
7719             if hasattr(value, "_fields"):
7720                 return []
7721             return range(len(value[1:]))
7722
7723         def load_child_node(self, key):
7724             return PN(
7725                 self.state,
7726                 self.get_value()[key + 1],
7727                 parent=self,
7728                 key=key,
7729                 depth=self.get_depth() + 1,
7730             )
7731
7732     class LabeledPG(urwid.ProgressBar):
7733         def __init__(self, label, *args, **kwargs):
7734             self.label = label
7735             super(LabeledPG, self).__init__(*args, **kwargs)
7736
7737         def get_text(self):
7738             return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7739
7740     WinHexdump = urwid.ListBox([urwid.Text("")])
7741     WinInfo = urwid.ListBox([urwid.Text("")])
7742     WinDecodePath = urwid.Text("", "center")
7743     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7744     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7745     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7746         {
7747             "raw": raw,
7748             "hexed": list(hexdump(raw)),
7749             "widget_current": None,
7750             "info": WinInfo,
7751             "info_bar": WinInfoBar,
7752             "hexdump": WinHexdump,
7753             "hexdump_bar": WinHexdumpBar,
7754             "decode_path": WinDecodePath,
7755         },
7756         list(obj.pps()),
7757     )))
7758     help_text = " ".join((
7759         "q:quit",
7760         "space:(un)collapse",
7761         "(pg)up/down/home/end:nav",
7762         "jkJK:hexdump hlHL:info",
7763         "dD:dump",
7764     ))
7765     urwid.MainLoop(
7766         urwid.Frame(
7767             urwid.Columns([
7768                 ("weight", 1, WinTree),
7769                 ("weight", 2, urwid.Pile([
7770                     urwid.LineBox(WinInfo),
7771                     urwid.LineBox(WinHexdump),
7772                 ])),
7773             ]),
7774             header=urwid.Columns([
7775                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7776                 ("weight", 1, WinInfoBar),
7777                 ("weight", 1, WinHexdumpBar),
7778             ]),
7779             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7780         ),
7781         [
7782             ("header", "bold", ""),
7783             ("constructed", "bold", ""),
7784             ("help", "light magenta", ""),
7785             ("warning", "light red", ""),
7786             ("defby", "light red", ""),
7787             ("eoc", "dark red", ""),
7788             ("select-value", "light green", ""),
7789             ("select-expl", "light red", ""),
7790             ("pg-normal", "", "light blue"),
7791             ("pg-complete", "black", "yellow"),
7792         ],
7793     ).run()
7794
7795
7796 def main():  # pragma: no cover
7797     import argparse
7798     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7799     parser.add_argument(
7800         "--skip",
7801         type=int,
7802         default=0,
7803         help="Skip that number of bytes from the beginning",
7804     )
7805     parser.add_argument(
7806         "--oids",
7807         help="Python paths to dictionary with OIDs, comma separated",
7808     )
7809     parser.add_argument(
7810         "--schema",
7811         help="Python path to schema definition to use",
7812     )
7813     parser.add_argument(
7814         "--defines-by-path",
7815         help="Python path to decoder's defines_by_path",
7816     )
7817     parser.add_argument(
7818         "--nobered",
7819         action="store_true",
7820         help="Disallow BER encoding",
7821     )
7822     parser.add_argument(
7823         "--print-decode-path",
7824         action="store_true",
7825         help="Print decode paths",
7826     )
7827     parser.add_argument(
7828         "--decode-path-only",
7829         help="Print only specified decode path",
7830     )
7831     parser.add_argument(
7832         "--allow-expl-oob",
7833         action="store_true",
7834         help="Allow explicit tag out-of-bound",
7835     )
7836     parser.add_argument(
7837         "--evgen",
7838         action="store_true",
7839         help="Turn on event generation mode",
7840     )
7841     parser.add_argument(
7842         "--browse",
7843         action="store_true",
7844         help="Start ASN.1 browser",
7845     )
7846     parser.add_argument(
7847         "RAWFile",
7848         type=argparse.FileType("rb"),
7849         help="Path to BER/CER/DER file you want to decode",
7850     )
7851     args = parser.parse_args()
7852     if PY2:
7853         args.RAWFile.seek(args.skip)
7854         raw = memoryview(args.RAWFile.read())
7855         args.RAWFile.close()
7856     else:
7857         raw = file_mmaped(args.RAWFile)[args.skip:]
7858     oid_maps = (
7859         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7860         if args.oids else ()
7861     )
7862     from functools import partial
7863     if args.schema:
7864         schema = obj_by_path(args.schema)
7865         pprinter = partial(pprint, big_blobs=True)
7866     else:
7867         schema, pprinter = generic_decoder()
7868     ctx = {
7869         "bered": not args.nobered,
7870         "allow_expl_oob": args.allow_expl_oob,
7871     }
7872     if args.defines_by_path is not None:
7873         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7874     if args.browse:
7875         obj, _ = schema().decode(raw, ctx=ctx)
7876         browse(raw, obj, oid_maps)
7877         from sys import exit as sys_exit
7878         sys_exit(0)
7879     from os import environ
7880     pprinter = partial(
7881         pprinter,
7882         oid_maps=oid_maps,
7883         with_colours=environ.get("NO_COLOR") is None,
7884         with_decode_path=args.print_decode_path,
7885         decode_path_only=(
7886             () if args.decode_path_only is None else
7887             tuple(args.decode_path_only.split(":"))
7888         ),
7889     )
7890     if args.evgen:
7891         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7892             print(pprinter(obj, decode_path=decode_path))
7893     else:
7894         obj, tail = schema().decode(raw, ctx=ctx)
7895         print(pprinter(obj))
7896     if tail != b"":
7897         print("\nTrailing data: %s" % hexenc(tail))
7898
7899
7900 if __name__ == "__main__":
7901     from pyderasn import *
7902     main()