]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Hide mmap imports, failing on Windows
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 .. _browser:
905
906 ASN.1 browser
907 -------------
908 .. autofunction:: pyderasn.browse
909
910 Base Obj
911 --------
912 .. autoclass:: pyderasn.Obj
913    :members:
914
915 Primitive types
916 ---------------
917
918 Boolean
919 _______
920 .. autoclass:: pyderasn.Boolean
921    :members: __init__
922
923 Integer
924 _______
925 .. autoclass:: pyderasn.Integer
926    :members: __init__, named, tohex
927
928 BitString
929 _________
930 .. autoclass:: pyderasn.BitString
931    :members: __init__, bit_len, named
932
933 OctetString
934 ___________
935 .. autoclass:: pyderasn.OctetString
936    :members: __init__
937
938 Null
939 ____
940 .. autoclass:: pyderasn.Null
941    :members: __init__
942
943 ObjectIdentifier
944 ________________
945 .. autoclass:: pyderasn.ObjectIdentifier
946    :members: __init__
947
948 Enumerated
949 __________
950 .. autoclass:: pyderasn.Enumerated
951
952 CommonString
953 ____________
954 .. autoclass:: pyderasn.CommonString
955
956 NumericString
957 _____________
958 .. autoclass:: pyderasn.NumericString
959
960 PrintableString
961 _______________
962 .. autoclass:: pyderasn.PrintableString
963    :members: __init__, allow_asterisk, allow_ampersand
964
965 IA5String
966 _________
967 .. autoclass:: pyderasn.IA5String
968
969 VisibleString
970 _____________
971 .. autoclass:: pyderasn.VisibleString
972
973 UTCTime
974 _______
975 .. autoclass:: pyderasn.UTCTime
976    :members: __init__, todatetime
977
978 GeneralizedTime
979 _______________
980 .. autoclass:: pyderasn.GeneralizedTime
981    :members: __init__, todatetime
982
983 Special types
984 -------------
985
986 Choice
987 ______
988 .. autoclass:: pyderasn.Choice
989    :members: __init__, choice, value
990
991 PrimitiveTypes
992 ______________
993 .. autoclass:: PrimitiveTypes
994
995 Any
996 ___
997 .. autoclass:: pyderasn.Any
998    :members: __init__
999
1000 Constructed types
1001 -----------------
1002
1003 Sequence
1004 ________
1005 .. autoclass:: pyderasn.Sequence
1006    :members: __init__
1007
1008 Set
1009 ___
1010 .. autoclass:: pyderasn.Set
1011    :members: __init__
1012
1013 SequenceOf
1014 __________
1015 .. autoclass:: pyderasn.SequenceOf
1016    :members: __init__
1017
1018 SetOf
1019 _____
1020 .. autoclass:: pyderasn.SetOf
1021    :members: __init__
1022
1023 Various
1024 -------
1025
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1041    :members: __init__
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1052
1053 .. _cmdline:
1054
1055 Command-line usage
1056 ------------------
1057
1058 You can decode DER/BER files using command line abilities::
1059
1060     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1061
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1065
1066     $ python -m pyderasn path/to/file
1067         0   [1,3,1604]  . >: SEQUENCE OF
1068         4   [1,3,1453]  . . >: SEQUENCE OF
1069         8   [0,0,   5]  . . . . >: [0] ANY
1070                         . . . . . A0:03:02:01:02
1071        13   [1,1,   3]  . . . . >: INTEGER 61595
1072        18   [1,1,  13]  . . . . >: SEQUENCE OF
1073        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074        31   [1,1,   0]  . . . . . . >: NULL
1075        33   [1,3, 274]  . . . . >: SEQUENCE OF
1076        37   [1,1,  11]  . . . . . . >: SET OF
1077        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1078        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1080     [...]
1081      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1082      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1084                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086                         . . . . . . . . . 61:2E:63:6F:6D:2F
1087      1461   [1,1,  13]  . . >: SEQUENCE OF
1088      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089      1474   [1,1,   0]  . . . . >: NULL
1090      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1091                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1093     [...]
1094
1095 Human readable OIDs
1096 ___________________
1097
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1100
1101     stroid2name = {
1102         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1104         [...]
1105         "2.5.4.10": "id-at-organizationName",
1106         "2.5.4.11": "id-at-organizationalUnitName",
1107     }
1108
1109 then you can pass it to pretty printer to see human readable OIDs::
1110
1111     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1112     [...]
1113        37   [1,1,  11]  . . . . . . >: SET OF
1114        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1115        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1117        50   [1,1,  18]  . . . . . . >: SET OF
1118        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1119        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121        70   [1,1,  18]  . . . . . . >: SET OF
1122        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1123        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1125     [...]
1126
1127 Decode paths
1128 ____________
1129
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1134
1135     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136        0    [1,3,1604]  Certificate SEQUENCE []
1137        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1143                          . . . . 05:00
1144       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150                          . . . . . . . 13:02:45:53
1151       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1152     [...]
1153
1154 Now you can print only the specified tree, for example signature algorithm::
1155
1156     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1158       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1160                          . . 05:00
1161 """
1162
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from operator import attrgetter
1174 from string import ascii_letters
1175 from string import digits
1176 from sys import maxsize as sys_maxsize
1177 from sys import version_info
1178 from unicodedata import category as unicat
1179
1180 from six import add_metaclass
1181 from six import binary_type
1182 from six import byte2int
1183 from six import indexbytes
1184 from six import int2byte
1185 from six import integer_types
1186 from six import iterbytes
1187 from six import iteritems
1188 from six import itervalues
1189 from six import PY2
1190 from six import string_types
1191 from six import text_type
1192 from six import unichr as six_unichr
1193 from six.moves import xrange as six_xrange
1194
1195
1196 try:
1197     from termcolor import colored
1198 except ImportError:  # pragma: no cover
1199     def colored(what, *args, **kwargs):
1200         return what
1201
1202 __version__ = "8.1"
1203
1204 __all__ = (
1205     "agg_octet_string",
1206     "Any",
1207     "BitString",
1208     "BMPString",
1209     "Boolean",
1210     "BoundsError",
1211     "Choice",
1212     "colonize_hex",
1213     "DecodeError",
1214     "DecodePathDefBy",
1215     "encode2pass",
1216     "encode_cer",
1217     "Enumerated",
1218     "ExceedingData",
1219     "file_mmaped",
1220     "GeneralizedTime",
1221     "GeneralString",
1222     "GraphicString",
1223     "hexdec",
1224     "hexenc",
1225     "IA5String",
1226     "Integer",
1227     "InvalidLength",
1228     "InvalidOID",
1229     "InvalidValueType",
1230     "ISO646String",
1231     "LenIndefForm",
1232     "NotEnoughData",
1233     "Null",
1234     "NumericString",
1235     "obj_by_path",
1236     "ObjectIdentifier",
1237     "ObjNotReady",
1238     "ObjUnknown",
1239     "OctetString",
1240     "PrimitiveTypes",
1241     "PrintableString",
1242     "Sequence",
1243     "SequenceOf",
1244     "Set",
1245     "SetOf",
1246     "T61String",
1247     "tag_ctxc",
1248     "tag_ctxp",
1249     "tag_decode",
1250     "TagClassApplication",
1251     "TagClassContext",
1252     "TagClassPrivate",
1253     "TagClassUniversal",
1254     "TagFormConstructed",
1255     "TagFormPrimitive",
1256     "TagMismatch",
1257     "TeletexString",
1258     "UniversalString",
1259     "UTCTime",
1260     "UTF8String",
1261     "VideotexString",
1262     "VisibleString",
1263 )
1264
1265 TagClassUniversal = 0
1266 TagClassApplication = 1 << 6
1267 TagClassContext = 1 << 7
1268 TagClassPrivate = 1 << 6 | 1 << 7
1269 TagFormPrimitive = 0
1270 TagFormConstructed = 1 << 5
1271 TagClassReprs = {
1272     TagClassContext: "",
1273     TagClassApplication: "APPLICATION ",
1274     TagClassPrivate: "PRIVATE ",
1275     TagClassUniversal: "UNIV ",
1276 }
1277 EOC = b"\x00\x00"
1278 EOC_LEN = len(EOC)
1279 LENINDEF = b"\x80"  # length indefinite mark
1280 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1281 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1282 SET01 = frozenset("01")
1283 DECIMALS = frozenset(digits)
1284 DECIMAL_SIGNS = ".,"
1285 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1286
1287
1288 def file_mmaped(fd):
1289     """Make mmap-ed memoryview for reading from file
1290
1291     :param fd: file object
1292     :returns: memoryview over read-only mmap-ing of the whole file
1293
1294     .. warning::
1295
1296        It is known to work under neither Python 2.x nor Windows.
1297     """
1298     import mmap
1299     return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ))
1300
1301
1302 def pureint(value):
1303     if not set(value) <= DECIMALS:
1304         raise ValueError("non-pure integer")
1305     return int(value)
1306
1307
1308 def fractions2float(fractions_raw):
1309     pureint(fractions_raw)
1310     return float("0." + fractions_raw)
1311
1312
1313 def get_def_by_path(defines_by_path, sub_decode_path):
1314     """Get define by decode path
1315     """
1316     for path, define in defines_by_path:
1317         if len(path) != len(sub_decode_path):
1318             continue
1319         for p1, p2 in zip(path, sub_decode_path):
1320             if (p1 is not any) and (p1 != p2):
1321                 break
1322         else:
1323             return define
1324
1325
1326 ########################################################################
1327 # Errors
1328 ########################################################################
1329
1330 class ASN1Error(ValueError):
1331     pass
1332
1333
1334 class DecodeError(ASN1Error):
1335     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1336         """
1337         :param str msg: reason of decode failing
1338         :param klass: optional exact DecodeError inherited class (like
1339                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1340                       :py:exc:`InvalidLength`)
1341         :param decode_path: tuple of strings. It contains human
1342                             readable names of the fields through which
1343                             decoding process has passed
1344         :param int offset: binary offset where failure happened
1345         """
1346         super(DecodeError, self).__init__()
1347         self.msg = msg
1348         self.klass = klass
1349         self.decode_path = decode_path
1350         self.offset = offset
1351
1352     def __str__(self):
1353         return " ".join(
1354             c for c in (
1355                 "" if self.klass is None else self.klass.__name__,
1356                 (
1357                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1358                     if len(self.decode_path) > 0 else ""
1359                 ),
1360                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1361                 self.msg,
1362             ) if c != ""
1363         )
1364
1365     def __repr__(self):
1366         return "%s(%s)" % (self.__class__.__name__, self)
1367
1368
1369 class NotEnoughData(DecodeError):
1370     pass
1371
1372
1373 class ExceedingData(ASN1Error):
1374     def __init__(self, nbytes):
1375         super(ExceedingData, self).__init__()
1376         self.nbytes = nbytes
1377
1378     def __str__(self):
1379         return "%d trailing bytes" % self.nbytes
1380
1381     def __repr__(self):
1382         return "%s(%s)" % (self.__class__.__name__, self)
1383
1384
1385 class LenIndefForm(DecodeError):
1386     pass
1387
1388
1389 class TagMismatch(DecodeError):
1390     pass
1391
1392
1393 class InvalidLength(DecodeError):
1394     pass
1395
1396
1397 class InvalidOID(DecodeError):
1398     pass
1399
1400
1401 class ObjUnknown(ASN1Error):
1402     def __init__(self, name):
1403         super(ObjUnknown, self).__init__()
1404         self.name = name
1405
1406     def __str__(self):
1407         return "object is unknown: %s" % self.name
1408
1409     def __repr__(self):
1410         return "%s(%s)" % (self.__class__.__name__, self)
1411
1412
1413 class ObjNotReady(ASN1Error):
1414     def __init__(self, name):
1415         super(ObjNotReady, self).__init__()
1416         self.name = name
1417
1418     def __str__(self):
1419         return "object is not ready: %s" % self.name
1420
1421     def __repr__(self):
1422         return "%s(%s)" % (self.__class__.__name__, self)
1423
1424
1425 class InvalidValueType(ASN1Error):
1426     def __init__(self, expected_types):
1427         super(InvalidValueType, self).__init__()
1428         self.expected_types = expected_types
1429
1430     def __str__(self):
1431         return "invalid value type, expected: %s" % ", ".join(
1432             [repr(t) for t in self.expected_types]
1433         )
1434
1435     def __repr__(self):
1436         return "%s(%s)" % (self.__class__.__name__, self)
1437
1438
1439 class BoundsError(ASN1Error):
1440     def __init__(self, bound_min, value, bound_max):
1441         super(BoundsError, self).__init__()
1442         self.bound_min = bound_min
1443         self.value = value
1444         self.bound_max = bound_max
1445
1446     def __str__(self):
1447         return "unsatisfied bounds: %s <= %s <= %s" % (
1448             self.bound_min,
1449             self.value,
1450             self.bound_max,
1451         )
1452
1453     def __repr__(self):
1454         return "%s(%s)" % (self.__class__.__name__, self)
1455
1456
1457 ########################################################################
1458 # Basic coders
1459 ########################################################################
1460
1461 _hexdecoder = getdecoder("hex")
1462 _hexencoder = getencoder("hex")
1463
1464
1465 def hexdec(data):
1466     """Binary data to hexadecimal string convert
1467     """
1468     return _hexdecoder(data)[0]
1469
1470
1471 def hexenc(data):
1472     """Hexadecimal string to binary data convert
1473     """
1474     return _hexencoder(data)[0].decode("ascii")
1475
1476
1477 def int_bytes_len(num, byte_len=8):
1478     if num == 0:
1479         return 1
1480     return int(ceil(float(num.bit_length()) / byte_len))
1481
1482
1483 def zero_ended_encode(num):
1484     octets = bytearray(int_bytes_len(num, 7))
1485     i = len(octets) - 1
1486     octets[i] = num & 0x7F
1487     num >>= 7
1488     i -= 1
1489     while num > 0:
1490         octets[i] = 0x80 | (num & 0x7F)
1491         num >>= 7
1492         i -= 1
1493     return bytes(octets)
1494
1495
1496 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1497     """Encode tag to binary form
1498
1499     :param int num: tag's number
1500     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1501                       :py:data:`pyderasn.TagClassContext`,
1502                       :py:data:`pyderasn.TagClassApplication`,
1503                       :py:data:`pyderasn.TagClassPrivate`)
1504     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1505                      :py:data:`pyderasn.TagFormConstructed`)
1506     """
1507     if num < 31:
1508         # [XX|X|.....]
1509         return int2byte(klass | form | num)
1510     # [XX|X|11111][1.......][1.......] ... [0.......]
1511     return int2byte(klass | form | 31) + zero_ended_encode(num)
1512
1513
1514 def tag_decode(tag):
1515     """Decode tag from binary form
1516
1517     .. warning::
1518
1519        No validation is performed, assuming that it has already passed.
1520
1521     It returns tuple with three integers, as
1522     :py:func:`pyderasn.tag_encode` accepts.
1523     """
1524     first_octet = byte2int(tag)
1525     klass = first_octet & 0xC0
1526     form = first_octet & 0x20
1527     if first_octet & 0x1F < 0x1F:
1528         return (klass, form, first_octet & 0x1F)
1529     num = 0
1530     for octet in iterbytes(tag[1:]):
1531         num <<= 7
1532         num |= octet & 0x7F
1533     return (klass, form, num)
1534
1535
1536 def tag_ctxp(num):
1537     """Create CONTEXT PRIMITIVE tag
1538     """
1539     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1540
1541
1542 def tag_ctxc(num):
1543     """Create CONTEXT CONSTRUCTED tag
1544     """
1545     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1546
1547
1548 def tag_strip(data):
1549     """Take off tag from the data
1550
1551     :returns: (encoded tag, tag length, remaining data)
1552     """
1553     if len(data) == 0:
1554         raise NotEnoughData("no data at all")
1555     if byte2int(data) & 0x1F < 31:
1556         return data[:1], 1, data[1:]
1557     i = 0
1558     while True:
1559         i += 1
1560         if i == len(data):
1561             raise DecodeError("unfinished tag")
1562         if indexbytes(data, i) & 0x80 == 0:
1563             break
1564     if i == 1 and indexbytes(data, 1) < 0x1F:
1565         raise DecodeError("unexpected long form")
1566     if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1567         raise DecodeError("leading zero byte in tag value")
1568     i += 1
1569     return data[:i], i, data[i:]
1570
1571
1572 def len_encode(l):
1573     if l < 0x80:
1574         return int2byte(l)
1575     octets = bytearray(int_bytes_len(l) + 1)
1576     octets[0] = 0x80 | (len(octets) - 1)
1577     for i in six_xrange(len(octets) - 1, 0, -1):
1578         octets[i] = l & 0xFF
1579         l >>= 8
1580     return bytes(octets)
1581
1582
1583 def len_decode(data):
1584     """Decode length
1585
1586     :returns: (decoded length, length's length, remaining data)
1587     :raises LenIndefForm: if indefinite form encoding is met
1588     """
1589     if len(data) == 0:
1590         raise NotEnoughData("no data at all")
1591     first_octet = byte2int(data)
1592     if first_octet & 0x80 == 0:
1593         return first_octet, 1, data[1:]
1594     octets_num = first_octet & 0x7F
1595     if octets_num + 1 > len(data):
1596         raise NotEnoughData("encoded length is longer than data")
1597     if octets_num == 0:
1598         raise LenIndefForm()
1599     if byte2int(data[1:]) == 0:
1600         raise DecodeError("leading zeros")
1601     l = 0
1602     for v in iterbytes(data[1:1 + octets_num]):
1603         l = (l << 8) | v
1604     if l <= 127:
1605         raise DecodeError("long form instead of short one")
1606     return l, 1 + octets_num, data[1 + octets_num:]
1607
1608
1609 LEN0 = len_encode(0)
1610 LEN1 = len_encode(1)
1611 LEN1K = len_encode(1000)
1612
1613
1614 def len_size(l):
1615     """How many bytes length field will take
1616     """
1617     if l < 128:
1618         return 1
1619     if l < 256:  # 1 << 8
1620         return 2
1621     if l < 65536:  # 1 << 16
1622         return 3
1623     if l < 16777216:  # 1 << 24
1624         return 4
1625     if l < 4294967296:  # 1 << 32
1626         return 5
1627     if l < 1099511627776:  # 1 << 40
1628         return 6
1629     if l < 281474976710656:  # 1 << 48
1630         return 7
1631     if l < 72057594037927936:  # 1 << 56
1632         return 8
1633     raise OverflowError("too big length")
1634
1635
1636 def write_full(writer, data):
1637     """Fully write provided data
1638
1639     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1640
1641     BytesIO does not guarantee that the whole data will be written at
1642     once. That function write everything provided, raising an error if
1643     ``writer`` returns None.
1644     """
1645     data = memoryview(data)
1646     written = 0
1647     while written != len(data):
1648         n = writer(data[written:])
1649         if n is None:
1650             raise ValueError("can not write to buf")
1651         written += n
1652
1653
1654 # If it is 64-bit system, then use compact 64-bit array of unsigned
1655 # longs. Use an ordinary list with universal integers otherwise, that
1656 # is slower.
1657 if sys_maxsize > 2 ** 32:
1658     def state_2pass_new():
1659         return array("L")
1660 else:
1661     def state_2pass_new():
1662         return []
1663
1664
1665 ########################################################################
1666 # Base class
1667 ########################################################################
1668
1669 class AutoAddSlots(type):
1670     def __new__(cls, name, bases, _dict):
1671         _dict["__slots__"] = _dict.get("__slots__", ())
1672         return type.__new__(cls, name, bases, _dict)
1673
1674
1675 BasicState = namedtuple("BasicState", (
1676     "version",
1677     "tag",
1678     "tag_order",
1679     "expl",
1680     "default",
1681     "optional",
1682     "offset",
1683     "llen",
1684     "vlen",
1685     "expl_lenindef",
1686     "lenindef",
1687     "ber_encoded",
1688 ), **NAMEDTUPLE_KWARGS)
1689
1690
1691 @add_metaclass(AutoAddSlots)
1692 class Obj(object):
1693     """Common ASN.1 object class
1694
1695     All ASN.1 types are inherited from it. It has metaclass that
1696     automatically adds ``__slots__`` to all inherited classes.
1697     """
1698     __slots__ = (
1699         "tag",
1700         "_tag_order",
1701         "_value",
1702         "_expl",
1703         "default",
1704         "optional",
1705         "offset",
1706         "llen",
1707         "vlen",
1708         "expl_lenindef",
1709         "lenindef",
1710         "ber_encoded",
1711     )
1712
1713     def __init__(
1714             self,
1715             impl=None,
1716             expl=None,
1717             default=None,
1718             optional=False,
1719             _decoded=(0, 0, 0),
1720     ):
1721         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1722         self._expl = getattr(self, "expl", None) if expl is None else expl
1723         if self.tag != self.tag_default and self._expl is not None:
1724             raise ValueError("implicit and explicit tags can not be set simultaneously")
1725         if self.tag is None:
1726             self._tag_order = None
1727         else:
1728             tag_class, _, tag_num = tag_decode(
1729                 self.tag if self._expl is None else self._expl
1730             )
1731             self._tag_order = (tag_class, tag_num)
1732         if default is not None:
1733             optional = True
1734         self.optional = optional
1735         self.offset, self.llen, self.vlen = _decoded
1736         self.default = None
1737         self.expl_lenindef = False
1738         self.lenindef = False
1739         self.ber_encoded = False
1740
1741     @property
1742     def ready(self):  # pragma: no cover
1743         """Is object ready to be encoded?
1744         """
1745         raise NotImplementedError()
1746
1747     def _assert_ready(self):
1748         if not self.ready:
1749             raise ObjNotReady(self.__class__.__name__)
1750
1751     @property
1752     def bered(self):
1753         """Is either object or any elements inside is BER encoded?
1754         """
1755         return self.expl_lenindef or self.lenindef or self.ber_encoded
1756
1757     @property
1758     def decoded(self):
1759         """Is object decoded?
1760         """
1761         return (self.llen + self.vlen) > 0
1762
1763     def __getstate__(self):  # pragma: no cover
1764         """Used for making safe to be mutable pickleable copies
1765         """
1766         raise NotImplementedError()
1767
1768     def __setstate__(self, state):
1769         if state.version != __version__:
1770             raise ValueError("data is pickled by different PyDERASN version")
1771         self.tag = state.tag
1772         self._tag_order = state.tag_order
1773         self._expl = state.expl
1774         self.default = state.default
1775         self.optional = state.optional
1776         self.offset = state.offset
1777         self.llen = state.llen
1778         self.vlen = state.vlen
1779         self.expl_lenindef = state.expl_lenindef
1780         self.lenindef = state.lenindef
1781         self.ber_encoded = state.ber_encoded
1782
1783     @property
1784     def tag_order(self):
1785         """Tag's (class, number) used for DER/CER sorting
1786         """
1787         return self._tag_order
1788
1789     @property
1790     def tag_order_cer(self):
1791         return self.tag_order
1792
1793     @property
1794     def tlen(self):
1795         """.. seealso:: :ref:`decoding`
1796         """
1797         return len(self.tag)
1798
1799     @property
1800     def tlvlen(self):
1801         """.. seealso:: :ref:`decoding`
1802         """
1803         return self.tlen + self.llen + self.vlen
1804
1805     def __str__(self):  # pragma: no cover
1806         return self.__bytes__() if PY2 else self.__unicode__()
1807
1808     def __ne__(self, their):
1809         return not(self == their)
1810
1811     def __gt__(self, their):  # pragma: no cover
1812         return not(self < their)
1813
1814     def __le__(self, their):  # pragma: no cover
1815         return (self == their) or (self < their)
1816
1817     def __ge__(self, their):  # pragma: no cover
1818         return (self == their) or (self > their)
1819
1820     def _encode(self):  # pragma: no cover
1821         raise NotImplementedError()
1822
1823     def _encode_cer(self, writer):
1824         write_full(writer, self._encode())
1825
1826     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1827         yield NotImplemented
1828
1829     def _encode1st(self, state):
1830         raise NotImplementedError()
1831
1832     def _encode2nd(self, writer, state_iter):
1833         raise NotImplementedError()
1834
1835     def encode(self):
1836         """DER encode the structure
1837
1838         :returns: DER representation
1839         """
1840         raw = self._encode()
1841         if self._expl is None:
1842             return raw
1843         return b"".join((self._expl, len_encode(len(raw)), raw))
1844
1845     def encode1st(self, state=None):
1846         """Do the 1st pass of 2-pass encoding
1847
1848         :rtype: (int, array("L"))
1849         :returns: full length of encoded data and precalculated various
1850                   objects lengths
1851         """
1852         if state is None:
1853             state = state_2pass_new()
1854         if self._expl is None:
1855             return self._encode1st(state)
1856         state.append(0)
1857         idx = len(state) - 1
1858         vlen, _ = self._encode1st(state)
1859         state[idx] = vlen
1860         fulllen = len(self._expl) + len_size(vlen) + vlen
1861         return fulllen, state
1862
1863     def encode2nd(self, writer, state_iter):
1864         """Do the 2nd pass of 2-pass encoding
1865
1866         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1867         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1868         """
1869         if self._expl is None:
1870             self._encode2nd(writer, state_iter)
1871         else:
1872             write_full(writer, self._expl + len_encode(next(state_iter)))
1873             self._encode2nd(writer, state_iter)
1874
1875     def encode_cer(self, writer):
1876         """CER encode the structure to specified writer
1877
1878         :param writer: must comply with ``io.RawIOBase.write``
1879                        behaviour. It takes slice to be written and
1880                        returns number of bytes processed. If it returns
1881                        None, then exception will be raised
1882         """
1883         if self._expl is not None:
1884             write_full(writer, self._expl + LENINDEF)
1885         if getattr(self, "der_forced", False):
1886             write_full(writer, self._encode())
1887         else:
1888             self._encode_cer(writer)
1889         if self._expl is not None:
1890             write_full(writer, EOC)
1891
1892     def hexencode(self):
1893         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1894         """
1895         return hexenc(self.encode())
1896
1897     def decode(
1898             self,
1899             data,
1900             offset=0,
1901             leavemm=False,
1902             decode_path=(),
1903             ctx=None,
1904             tag_only=False,
1905             _ctx_immutable=True,
1906     ):
1907         """Decode the data
1908
1909         :param data: either binary or memoryview
1910         :param int offset: initial data's offset
1911         :param bool leavemm: do we need to leave memoryview of remaining
1912                     data as is, or convert it to bytes otherwise
1913         :param decode_path: current decode path (tuples of strings,
1914                             possibly with DecodePathDefBy) with will be
1915                             the root for all underlying objects
1916         :param ctx: optional :ref:`context <ctx>` governing decoding process
1917         :param bool tag_only: decode only the tag, without length and
1918                               contents (used only in Choice and Set
1919                               structures, trying to determine if tag satisfies
1920                               the schema)
1921         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1922                                     before using it?
1923         :returns: (Obj, remaining data)
1924
1925         .. seealso:: :ref:`decoding`
1926         """
1927         result = next(self.decode_evgen(
1928             data,
1929             offset,
1930             leavemm,
1931             decode_path,
1932             ctx,
1933             tag_only,
1934             _ctx_immutable,
1935             _evgen_mode=False,
1936         ))
1937         if result is None:
1938             return None
1939         _, obj, tail = result
1940         return obj, tail
1941
1942     def decode_evgen(
1943             self,
1944             data,
1945             offset=0,
1946             leavemm=False,
1947             decode_path=(),
1948             ctx=None,
1949             tag_only=False,
1950             _ctx_immutable=True,
1951             _evgen_mode=True,
1952     ):
1953         """Decode with evgen mode on
1954
1955         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1956         it returns the generator producing ``(decode_path, obj, tail)``
1957         values.
1958         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1959         """
1960         if ctx is None:
1961             ctx = {}
1962         elif _ctx_immutable:
1963             ctx = copy(ctx)
1964         tlv = memoryview(data)
1965         if (
1966                 _evgen_mode and
1967                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1968         ):
1969             _evgen_mode = False
1970         if self._expl is None:
1971             for result in self._decode(
1972                     tlv,
1973                     offset=offset,
1974                     decode_path=decode_path,
1975                     ctx=ctx,
1976                     tag_only=tag_only,
1977                     evgen_mode=_evgen_mode,
1978             ):
1979                 if tag_only:
1980                     yield None
1981                     return
1982                 _decode_path, obj, tail = result
1983                 if _decode_path is not decode_path:
1984                     yield result
1985         else:
1986             try:
1987                 t, tlen, lv = tag_strip(tlv)
1988             except DecodeError as err:
1989                 raise err.__class__(
1990                     msg=err.msg,
1991                     klass=self.__class__,
1992                     decode_path=decode_path,
1993                     offset=offset,
1994                 )
1995             if t != self._expl:
1996                 raise TagMismatch(
1997                     klass=self.__class__,
1998                     decode_path=decode_path,
1999                     offset=offset,
2000                 )
2001             try:
2002                 l, llen, v = len_decode(lv)
2003             except LenIndefForm as err:
2004                 if not ctx.get("bered", False):
2005                     raise err.__class__(
2006                         msg=err.msg,
2007                         klass=self.__class__,
2008                         decode_path=decode_path,
2009                         offset=offset,
2010                     )
2011                 llen, v = 1, lv[1:]
2012                 offset += tlen + llen
2013                 for result in self._decode(
2014                         v,
2015                         offset=offset,
2016                         decode_path=decode_path,
2017                         ctx=ctx,
2018                         tag_only=tag_only,
2019                         evgen_mode=_evgen_mode,
2020                 ):
2021                     if tag_only:  # pragma: no cover
2022                         yield None
2023                         return
2024                     _decode_path, obj, tail = result
2025                     if _decode_path is not decode_path:
2026                         yield result
2027                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2028                 if eoc_expected.tobytes() != EOC:
2029                     raise DecodeError(
2030                         "no EOC",
2031                         klass=self.__class__,
2032                         decode_path=decode_path,
2033                         offset=offset,
2034                     )
2035                 obj.vlen += EOC_LEN
2036                 obj.expl_lenindef = True
2037             except DecodeError as err:
2038                 raise err.__class__(
2039                     msg=err.msg,
2040                     klass=self.__class__,
2041                     decode_path=decode_path,
2042                     offset=offset,
2043                 )
2044             else:
2045                 if l > len(v):
2046                     raise NotEnoughData(
2047                         "encoded length is longer than data",
2048                         klass=self.__class__,
2049                         decode_path=decode_path,
2050                         offset=offset,
2051                     )
2052                 for result in self._decode(
2053                         v,
2054                         offset=offset + tlen + llen,
2055                         decode_path=decode_path,
2056                         ctx=ctx,
2057                         tag_only=tag_only,
2058                         evgen_mode=_evgen_mode,
2059                 ):
2060                     if tag_only:  # pragma: no cover
2061                         yield None
2062                         return
2063                     _decode_path, obj, tail = result
2064                     if _decode_path is not decode_path:
2065                         yield result
2066                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2067                     raise DecodeError(
2068                         "explicit tag out-of-bound, longer than data",
2069                         klass=self.__class__,
2070                         decode_path=decode_path,
2071                         offset=offset,
2072                     )
2073         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2074
2075     def decod(self, data, offset=0, decode_path=(), ctx=None):
2076         """Decode the data, check that tail is empty
2077
2078         :raises ExceedingData: if tail is not empty
2079
2080         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2081         (decode without tail) that also checks that there is no
2082         trailing data left.
2083         """
2084         obj, tail = self.decode(
2085             data,
2086             offset=offset,
2087             decode_path=decode_path,
2088             ctx=ctx,
2089             leavemm=True,
2090         )
2091         if len(tail) > 0:
2092             raise ExceedingData(len(tail))
2093         return obj
2094
2095     def hexdecode(self, data, *args, **kwargs):
2096         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2097         """
2098         return self.decode(hexdec(data), *args, **kwargs)
2099
2100     def hexdecod(self, data, *args, **kwargs):
2101         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2102         """
2103         return self.decod(hexdec(data), *args, **kwargs)
2104
2105     @property
2106     def expled(self):
2107         """.. seealso:: :ref:`decoding`
2108         """
2109         return self._expl is not None
2110
2111     @property
2112     def expl_tag(self):
2113         """.. seealso:: :ref:`decoding`
2114         """
2115         return self._expl
2116
2117     @property
2118     def expl_tlen(self):
2119         """.. seealso:: :ref:`decoding`
2120         """
2121         return len(self._expl)
2122
2123     @property
2124     def expl_llen(self):
2125         """.. seealso:: :ref:`decoding`
2126         """
2127         if self.expl_lenindef:
2128             return 1
2129         return len(len_encode(self.tlvlen))
2130
2131     @property
2132     def expl_offset(self):
2133         """.. seealso:: :ref:`decoding`
2134         """
2135         return self.offset - self.expl_tlen - self.expl_llen
2136
2137     @property
2138     def expl_vlen(self):
2139         """.. seealso:: :ref:`decoding`
2140         """
2141         return self.tlvlen
2142
2143     @property
2144     def expl_tlvlen(self):
2145         """.. seealso:: :ref:`decoding`
2146         """
2147         return self.expl_tlen + self.expl_llen + self.expl_vlen
2148
2149     @property
2150     def fulloffset(self):
2151         """.. seealso:: :ref:`decoding`
2152         """
2153         return self.expl_offset if self.expled else self.offset
2154
2155     @property
2156     def fulllen(self):
2157         """.. seealso:: :ref:`decoding`
2158         """
2159         return self.expl_tlvlen if self.expled else self.tlvlen
2160
2161     def pps_lenindef(self, decode_path):
2162         if self.lenindef and not (
2163                 getattr(self, "defined", None) is not None and
2164                 self.defined[1].lenindef
2165         ):
2166             yield _pp(
2167                 asn1_type_name="EOC",
2168                 obj_name="",
2169                 decode_path=decode_path,
2170                 offset=(
2171                     self.offset + self.tlvlen -
2172                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2173                 ),
2174                 tlen=1,
2175                 llen=1,
2176                 vlen=0,
2177                 ber_encoded=True,
2178                 bered=True,
2179             )
2180         if self.expl_lenindef:
2181             yield _pp(
2182                 asn1_type_name="EOC",
2183                 obj_name="EXPLICIT",
2184                 decode_path=decode_path,
2185                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2186                 tlen=1,
2187                 llen=1,
2188                 vlen=0,
2189                 ber_encoded=True,
2190                 bered=True,
2191             )
2192
2193
2194 def encode_cer(obj):
2195     """Encode to CER in memory buffer
2196
2197     :returns bytes: memory buffer contents
2198     """
2199     buf = BytesIO()
2200     obj.encode_cer(buf.write)
2201     return buf.getvalue()
2202
2203
2204 def encode2pass(obj):
2205     """Encode (2-pass mode) to DER in memory buffer
2206
2207     :returns bytes: memory buffer contents
2208     """
2209     buf = BytesIO()
2210     _, state = obj.encode1st()
2211     obj.encode2nd(buf.write, iter(state))
2212     return buf.getvalue()
2213
2214
2215 class DecodePathDefBy(object):
2216     """DEFINED BY representation inside decode path
2217     """
2218     __slots__ = ("defined_by",)
2219
2220     def __init__(self, defined_by):
2221         self.defined_by = defined_by
2222
2223     def __ne__(self, their):
2224         return not(self == their)
2225
2226     def __eq__(self, their):
2227         if not isinstance(their, self.__class__):
2228             return False
2229         return self.defined_by == their.defined_by
2230
2231     def __str__(self):
2232         return "DEFINED BY " + str(self.defined_by)
2233
2234     def __repr__(self):
2235         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2236
2237
2238 ########################################################################
2239 # Pretty printing
2240 ########################################################################
2241
2242 PP = namedtuple("PP", (
2243     "obj",
2244     "asn1_type_name",
2245     "obj_name",
2246     "decode_path",
2247     "value",
2248     "blob",
2249     "optional",
2250     "default",
2251     "impl",
2252     "expl",
2253     "offset",
2254     "tlen",
2255     "llen",
2256     "vlen",
2257     "expl_offset",
2258     "expl_tlen",
2259     "expl_llen",
2260     "expl_vlen",
2261     "expl_lenindef",
2262     "lenindef",
2263     "ber_encoded",
2264     "bered",
2265 ), **NAMEDTUPLE_KWARGS)
2266
2267
2268 def _pp(
2269         obj=None,
2270         asn1_type_name="unknown",
2271         obj_name="unknown",
2272         decode_path=(),
2273         value=None,
2274         blob=None,
2275         optional=False,
2276         default=False,
2277         impl=None,
2278         expl=None,
2279         offset=0,
2280         tlen=0,
2281         llen=0,
2282         vlen=0,
2283         expl_offset=None,
2284         expl_tlen=None,
2285         expl_llen=None,
2286         expl_vlen=None,
2287         expl_lenindef=False,
2288         lenindef=False,
2289         ber_encoded=False,
2290         bered=False,
2291 ):
2292     return PP(
2293         obj,
2294         asn1_type_name,
2295         obj_name,
2296         decode_path,
2297         value,
2298         blob,
2299         optional,
2300         default,
2301         impl,
2302         expl,
2303         offset,
2304         tlen,
2305         llen,
2306         vlen,
2307         expl_offset,
2308         expl_tlen,
2309         expl_llen,
2310         expl_vlen,
2311         expl_lenindef,
2312         lenindef,
2313         ber_encoded,
2314         bered,
2315     )
2316
2317
2318 def _colourize(what, colour, with_colours, attrs=("bold",)):
2319     return colored(what, colour, attrs=attrs) if with_colours else what
2320
2321
2322 def colonize_hex(hexed):
2323     """Separate hexadecimal string with colons
2324     """
2325     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2326
2327
2328 def find_oid_name(asn1_type_name, oid_maps, value):
2329     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2330         for oid_map in oid_maps:
2331             oid_name = oid_map.get(value)
2332             if oid_name is not None:
2333                 return oid_name
2334     return None
2335
2336
2337 def pp_console_row(
2338         pp,
2339         oid_maps=(),
2340         with_offsets=False,
2341         with_blob=True,
2342         with_colours=False,
2343         with_decode_path=False,
2344         decode_path_len_decrease=0,
2345 ):
2346     cols = []
2347     if with_offsets:
2348         col = "%5d%s%s" % (
2349             pp.offset,
2350             (
2351                 "  " if pp.expl_offset is None else
2352                 ("-%d" % (pp.offset - pp.expl_offset))
2353             ),
2354             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2355         )
2356         col = _colourize(col, "red", with_colours, ())
2357         col += _colourize("B", "red", with_colours) if pp.bered else " "
2358         cols.append(col)
2359         col = "[%d,%d,%4d]%s" % (
2360             pp.tlen, pp.llen, pp.vlen,
2361             LENINDEF_PP_CHAR if pp.lenindef else " "
2362         )
2363         col = _colourize(col, "green", with_colours, ())
2364         cols.append(col)
2365     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2366     if decode_path_len > 0:
2367         cols.append(" ." * decode_path_len)
2368         ent = pp.decode_path[-1]
2369         if isinstance(ent, DecodePathDefBy):
2370             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2371             value = str(ent.defined_by)
2372             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2373             if oid_name is None:
2374                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2375             else:
2376                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2377         else:
2378             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2379     if pp.expl is not None:
2380         klass, _, num = pp.expl
2381         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2382         cols.append(_colourize(col, "blue", with_colours))
2383     if pp.impl is not None:
2384         klass, _, num = pp.impl
2385         col = "[%s%d]" % (TagClassReprs[klass], num)
2386         cols.append(_colourize(col, "blue", with_colours))
2387     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2388         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2389     if pp.ber_encoded:
2390         cols.append(_colourize("BER", "red", with_colours))
2391     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2392     if pp.value is not None:
2393         value = pp.value
2394         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2395         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2396         if oid_name is not None:
2397             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2398         if pp.asn1_type_name == Integer.asn1_type_name:
2399             cols.append(_colourize(
2400                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2401             ))
2402     if with_blob:
2403         if pp.blob.__class__ == binary_type:
2404             cols.append(hexenc(pp.blob))
2405         elif pp.blob.__class__ == tuple:
2406             cols.append(", ".join(pp.blob))
2407     if pp.optional:
2408         cols.append(_colourize("OPTIONAL", "red", with_colours))
2409     if pp.default:
2410         cols.append(_colourize("DEFAULT", "red", with_colours))
2411     if with_decode_path:
2412         cols.append(_colourize(
2413             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2414             "grey",
2415             with_colours,
2416         ))
2417     return " ".join(cols)
2418
2419
2420 def pp_console_blob(pp, decode_path_len_decrease=0):
2421     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2422     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2423     if decode_path_len > 0:
2424         cols.append(" ." * (decode_path_len + 1))
2425     if pp.blob.__class__ == binary_type:
2426         blob = hexenc(pp.blob).upper()
2427         for i in six_xrange(0, len(blob), 32):
2428             chunk = blob[i:i + 32]
2429             yield " ".join(cols + [colonize_hex(chunk)])
2430     elif pp.blob.__class__ == tuple:
2431         yield " ".join(cols + [", ".join(pp.blob)])
2432
2433
2434 def pprint(
2435         obj,
2436         oid_maps=(),
2437         big_blobs=False,
2438         with_colours=False,
2439         with_decode_path=False,
2440         decode_path_only=(),
2441         decode_path=(),
2442 ):
2443     """Pretty print object
2444
2445     :param Obj obj: object you want to pretty print
2446     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2447                      Its human readable form is printed when OID is met
2448     :param big_blobs: if large binary objects are met (like OctetString
2449                       values), do we need to print them too, on separate
2450                       lines
2451     :param with_colours: colourize output, if ``termcolor`` library
2452                          is available
2453     :param with_decode_path: print decode path
2454     :param decode_path_only: print only that specified decode path
2455     """
2456     def _pprint_pps(pps):
2457         for pp in pps:
2458             if hasattr(pp, "_fields"):
2459                 if (
2460                         decode_path_only != () and
2461                         tuple(
2462                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2463                         ) != decode_path_only
2464                 ):
2465                     continue
2466                 if big_blobs:
2467                     yield pp_console_row(
2468                         pp,
2469                         oid_maps=oid_maps,
2470                         with_offsets=True,
2471                         with_blob=False,
2472                         with_colours=with_colours,
2473                         with_decode_path=with_decode_path,
2474                         decode_path_len_decrease=len(decode_path_only),
2475                     )
2476                     for row in pp_console_blob(
2477                             pp,
2478                             decode_path_len_decrease=len(decode_path_only),
2479                     ):
2480                         yield row
2481                 else:
2482                     yield pp_console_row(
2483                         pp,
2484                         oid_maps=oid_maps,
2485                         with_offsets=True,
2486                         with_blob=True,
2487                         with_colours=with_colours,
2488                         with_decode_path=with_decode_path,
2489                         decode_path_len_decrease=len(decode_path_only),
2490                     )
2491             else:
2492                 for row in _pprint_pps(pp):
2493                     yield row
2494     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2495
2496
2497 ########################################################################
2498 # ASN.1 primitive types
2499 ########################################################################
2500
2501 BooleanState = namedtuple(
2502     "BooleanState",
2503     BasicState._fields + ("value",),
2504     **NAMEDTUPLE_KWARGS
2505 )
2506
2507
2508 class Boolean(Obj):
2509     """``BOOLEAN`` boolean type
2510
2511     >>> b = Boolean(True)
2512     BOOLEAN True
2513     >>> b == Boolean(True)
2514     True
2515     >>> bool(b)
2516     True
2517     """
2518     __slots__ = ()
2519     tag_default = tag_encode(1)
2520     asn1_type_name = "BOOLEAN"
2521
2522     def __init__(
2523             self,
2524             value=None,
2525             impl=None,
2526             expl=None,
2527             default=None,
2528             optional=False,
2529             _decoded=(0, 0, 0),
2530     ):
2531         """
2532         :param value: set the value. Either boolean type, or
2533                       :py:class:`pyderasn.Boolean` object
2534         :param bytes impl: override default tag with ``IMPLICIT`` one
2535         :param bytes expl: override default tag with ``EXPLICIT`` one
2536         :param default: set default value. Type same as in ``value``
2537         :param bool optional: is object ``OPTIONAL`` in sequence
2538         """
2539         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2540         self._value = None if value is None else self._value_sanitize(value)
2541         if default is not None:
2542             default = self._value_sanitize(default)
2543             self.default = self.__class__(
2544                 value=default,
2545                 impl=self.tag,
2546                 expl=self._expl,
2547             )
2548             if value is None:
2549                 self._value = default
2550
2551     def _value_sanitize(self, value):
2552         if value.__class__ == bool:
2553             return value
2554         if issubclass(value.__class__, Boolean):
2555             return value._value
2556         raise InvalidValueType((self.__class__, bool))
2557
2558     @property
2559     def ready(self):
2560         return self._value is not None
2561
2562     def __getstate__(self):
2563         return BooleanState(
2564             __version__,
2565             self.tag,
2566             self._tag_order,
2567             self._expl,
2568             self.default,
2569             self.optional,
2570             self.offset,
2571             self.llen,
2572             self.vlen,
2573             self.expl_lenindef,
2574             self.lenindef,
2575             self.ber_encoded,
2576             self._value,
2577         )
2578
2579     def __setstate__(self, state):
2580         super(Boolean, self).__setstate__(state)
2581         self._value = state.value
2582
2583     def __nonzero__(self):
2584         self._assert_ready()
2585         return self._value
2586
2587     def __bool__(self):
2588         self._assert_ready()
2589         return self._value
2590
2591     def __eq__(self, their):
2592         if their.__class__ == bool:
2593             return self._value == their
2594         if not issubclass(their.__class__, Boolean):
2595             return False
2596         return (
2597             self._value == their._value and
2598             self.tag == their.tag and
2599             self._expl == their._expl
2600         )
2601
2602     def __call__(
2603             self,
2604             value=None,
2605             impl=None,
2606             expl=None,
2607             default=None,
2608             optional=None,
2609     ):
2610         return self.__class__(
2611             value=value,
2612             impl=self.tag if impl is None else impl,
2613             expl=self._expl if expl is None else expl,
2614             default=self.default if default is None else default,
2615             optional=self.optional if optional is None else optional,
2616         )
2617
2618     def _encode(self):
2619         self._assert_ready()
2620         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2621
2622     def _encode1st(self, state):
2623         return len(self.tag) + 2, state
2624
2625     def _encode2nd(self, writer, state_iter):
2626         self._assert_ready()
2627         write_full(writer, self._encode())
2628
2629     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2630         try:
2631             t, _, lv = tag_strip(tlv)
2632         except DecodeError as err:
2633             raise err.__class__(
2634                 msg=err.msg,
2635                 klass=self.__class__,
2636                 decode_path=decode_path,
2637                 offset=offset,
2638             )
2639         if t != self.tag:
2640             raise TagMismatch(
2641                 klass=self.__class__,
2642                 decode_path=decode_path,
2643                 offset=offset,
2644             )
2645         if tag_only:
2646             yield None
2647             return
2648         try:
2649             l, _, v = len_decode(lv)
2650         except DecodeError as err:
2651             raise err.__class__(
2652                 msg=err.msg,
2653                 klass=self.__class__,
2654                 decode_path=decode_path,
2655                 offset=offset,
2656             )
2657         if l != 1:
2658             raise InvalidLength(
2659                 "Boolean's length must be equal to 1",
2660                 klass=self.__class__,
2661                 decode_path=decode_path,
2662                 offset=offset,
2663             )
2664         if l > len(v):
2665             raise NotEnoughData(
2666                 "encoded length is longer than data",
2667                 klass=self.__class__,
2668                 decode_path=decode_path,
2669                 offset=offset,
2670             )
2671         first_octet = byte2int(v)
2672         ber_encoded = False
2673         if first_octet == 0:
2674             value = False
2675         elif first_octet == 0xFF:
2676             value = True
2677         elif ctx.get("bered", False):
2678             value = True
2679             ber_encoded = True
2680         else:
2681             raise DecodeError(
2682                 "unacceptable Boolean value",
2683                 klass=self.__class__,
2684                 decode_path=decode_path,
2685                 offset=offset,
2686             )
2687         obj = self.__class__(
2688             value=value,
2689             impl=self.tag,
2690             expl=self._expl,
2691             default=self.default,
2692             optional=self.optional,
2693             _decoded=(offset, 1, 1),
2694         )
2695         obj.ber_encoded = ber_encoded
2696         yield decode_path, obj, v[1:]
2697
2698     def __repr__(self):
2699         return pp_console_row(next(self.pps()))
2700
2701     def pps(self, decode_path=()):
2702         yield _pp(
2703             obj=self,
2704             asn1_type_name=self.asn1_type_name,
2705             obj_name=self.__class__.__name__,
2706             decode_path=decode_path,
2707             value=str(self._value) if self.ready else None,
2708             optional=self.optional,
2709             default=self == self.default,
2710             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2711             expl=None if self._expl is None else tag_decode(self._expl),
2712             offset=self.offset,
2713             tlen=self.tlen,
2714             llen=self.llen,
2715             vlen=self.vlen,
2716             expl_offset=self.expl_offset if self.expled else None,
2717             expl_tlen=self.expl_tlen if self.expled else None,
2718             expl_llen=self.expl_llen if self.expled else None,
2719             expl_vlen=self.expl_vlen if self.expled else None,
2720             expl_lenindef=self.expl_lenindef,
2721             ber_encoded=self.ber_encoded,
2722             bered=self.bered,
2723         )
2724         for pp in self.pps_lenindef(decode_path):
2725             yield pp
2726
2727
2728 IntegerState = namedtuple(
2729     "IntegerState",
2730     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2731     **NAMEDTUPLE_KWARGS
2732 )
2733
2734
2735 class Integer(Obj):
2736     """``INTEGER`` integer type
2737
2738     >>> b = Integer(-123)
2739     INTEGER -123
2740     >>> b == Integer(-123)
2741     True
2742     >>> int(b)
2743     -123
2744
2745     >>> Integer(2, bounds=(1, 3))
2746     INTEGER 2
2747     >>> Integer(5, bounds=(1, 3))
2748     Traceback (most recent call last):
2749     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2750
2751     ::
2752
2753         class Version(Integer):
2754             schema = (
2755                 ("v1", 0),
2756                 ("v2", 1),
2757                 ("v3", 2),
2758             )
2759
2760     >>> v = Version("v1")
2761     Version INTEGER v1
2762     >>> int(v)
2763     0
2764     >>> v.named
2765     'v1'
2766     >>> v.specs
2767     {'v3': 2, 'v1': 0, 'v2': 1}
2768     """
2769     __slots__ = ("specs", "_bound_min", "_bound_max")
2770     tag_default = tag_encode(2)
2771     asn1_type_name = "INTEGER"
2772
2773     def __init__(
2774             self,
2775             value=None,
2776             bounds=None,
2777             impl=None,
2778             expl=None,
2779             default=None,
2780             optional=False,
2781             _specs=None,
2782             _decoded=(0, 0, 0),
2783     ):
2784         """
2785         :param value: set the value. Either integer type, named value
2786                       (if ``schema`` is specified in the class), or
2787                       :py:class:`pyderasn.Integer` object
2788         :param bounds: set ``(MIN, MAX)`` value constraint.
2789                        (-inf, +inf) by default
2790         :param bytes impl: override default tag with ``IMPLICIT`` one
2791         :param bytes expl: override default tag with ``EXPLICIT`` one
2792         :param default: set default value. Type same as in ``value``
2793         :param bool optional: is object ``OPTIONAL`` in sequence
2794         """
2795         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2796         self._value = value
2797         specs = getattr(self, "schema", {}) if _specs is None else _specs
2798         self.specs = specs if specs.__class__ == dict else dict(specs)
2799         self._bound_min, self._bound_max = getattr(
2800             self,
2801             "bounds",
2802             (float("-inf"), float("+inf")),
2803         ) if bounds is None else bounds
2804         if value is not None:
2805             self._value = self._value_sanitize(value)
2806         if default is not None:
2807             default = self._value_sanitize(default)
2808             self.default = self.__class__(
2809                 value=default,
2810                 impl=self.tag,
2811                 expl=self._expl,
2812                 _specs=self.specs,
2813             )
2814             if self._value is None:
2815                 self._value = default
2816
2817     def _value_sanitize(self, value):
2818         if isinstance(value, integer_types):
2819             pass
2820         elif issubclass(value.__class__, Integer):
2821             value = value._value
2822         elif value.__class__ == str:
2823             value = self.specs.get(value)
2824             if value is None:
2825                 raise ObjUnknown("integer value: %s" % value)
2826         else:
2827             raise InvalidValueType((self.__class__, int, str))
2828         if not self._bound_min <= value <= self._bound_max:
2829             raise BoundsError(self._bound_min, value, self._bound_max)
2830         return value
2831
2832     @property
2833     def ready(self):
2834         return self._value is not None
2835
2836     def __getstate__(self):
2837         return IntegerState(
2838             __version__,
2839             self.tag,
2840             self._tag_order,
2841             self._expl,
2842             self.default,
2843             self.optional,
2844             self.offset,
2845             self.llen,
2846             self.vlen,
2847             self.expl_lenindef,
2848             self.lenindef,
2849             self.ber_encoded,
2850             self.specs,
2851             self._value,
2852             self._bound_min,
2853             self._bound_max,
2854         )
2855
2856     def __setstate__(self, state):
2857         super(Integer, self).__setstate__(state)
2858         self.specs = state.specs
2859         self._value = state.value
2860         self._bound_min = state.bound_min
2861         self._bound_max = state.bound_max
2862
2863     def __int__(self):
2864         self._assert_ready()
2865         return int(self._value)
2866
2867     def tohex(self):
2868         """Hexadecimal representation
2869
2870         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2871         """
2872         hex_repr = hex(int(self))[2:].upper()
2873         if len(hex_repr) % 2 != 0:
2874             hex_repr = "0" + hex_repr
2875         return hex_repr
2876
2877     def __hash__(self):
2878         self._assert_ready()
2879         return hash(b"".join((
2880             self.tag,
2881             bytes(self._expl or b""),
2882             str(self._value).encode("ascii"),
2883         )))
2884
2885     def __eq__(self, their):
2886         if isinstance(their, integer_types):
2887             return self._value == their
2888         if not issubclass(their.__class__, Integer):
2889             return False
2890         return (
2891             self._value == their._value and
2892             self.tag == their.tag and
2893             self._expl == their._expl
2894         )
2895
2896     def __lt__(self, their):
2897         return self._value < their._value
2898
2899     @property
2900     def named(self):
2901         """Return named representation (if exists) of the value
2902         """
2903         for name, value in iteritems(self.specs):
2904             if value == self._value:
2905                 return name
2906         return None
2907
2908     def __call__(
2909             self,
2910             value=None,
2911             bounds=None,
2912             impl=None,
2913             expl=None,
2914             default=None,
2915             optional=None,
2916     ):
2917         return self.__class__(
2918             value=value,
2919             bounds=(
2920                 (self._bound_min, self._bound_max)
2921                 if bounds is None else bounds
2922             ),
2923             impl=self.tag if impl is None else impl,
2924             expl=self._expl if expl is None else expl,
2925             default=self.default if default is None else default,
2926             optional=self.optional if optional is None else optional,
2927             _specs=self.specs,
2928         )
2929
2930     def _encode_payload(self):
2931         self._assert_ready()
2932         value = self._value
2933         if PY2:
2934             if value == 0:
2935                 octets = bytearray([0])
2936             elif value < 0:
2937                 value = -value
2938                 value -= 1
2939                 octets = bytearray()
2940                 while value > 0:
2941                     octets.append((value & 0xFF) ^ 0xFF)
2942                     value >>= 8
2943                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2944                     octets.append(0xFF)
2945             else:
2946                 octets = bytearray()
2947                 while value > 0:
2948                     octets.append(value & 0xFF)
2949                     value >>= 8
2950                 if octets[-1] & 0x80 > 0:
2951                     octets.append(0x00)
2952             octets.reverse()
2953             octets = bytes(octets)
2954         else:
2955             bytes_len = ceil(value.bit_length() / 8) or 1
2956             while True:
2957                 try:
2958                     octets = value.to_bytes(
2959                         bytes_len,
2960                         byteorder="big",
2961                         signed=True,
2962                     )
2963                 except OverflowError:
2964                     bytes_len += 1
2965                 else:
2966                     break
2967         return octets
2968
2969     def _encode(self):
2970         octets = self._encode_payload()
2971         return b"".join((self.tag, len_encode(len(octets)), octets))
2972
2973     def _encode1st(self, state):
2974         l = len(self._encode_payload())
2975         return len(self.tag) + len_size(l) + l, state
2976
2977     def _encode2nd(self, writer, state_iter):
2978         write_full(writer, self._encode())
2979
2980     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2981         try:
2982             t, _, lv = tag_strip(tlv)
2983         except DecodeError as err:
2984             raise err.__class__(
2985                 msg=err.msg,
2986                 klass=self.__class__,
2987                 decode_path=decode_path,
2988                 offset=offset,
2989             )
2990         if t != self.tag:
2991             raise TagMismatch(
2992                 klass=self.__class__,
2993                 decode_path=decode_path,
2994                 offset=offset,
2995             )
2996         if tag_only:
2997             yield None
2998             return
2999         try:
3000             l, llen, v = len_decode(lv)
3001         except DecodeError as err:
3002             raise err.__class__(
3003                 msg=err.msg,
3004                 klass=self.__class__,
3005                 decode_path=decode_path,
3006                 offset=offset,
3007             )
3008         if l > len(v):
3009             raise NotEnoughData(
3010                 "encoded length is longer than data",
3011                 klass=self.__class__,
3012                 decode_path=decode_path,
3013                 offset=offset,
3014             )
3015         if l == 0:
3016             raise NotEnoughData(
3017                 "zero length",
3018                 klass=self.__class__,
3019                 decode_path=decode_path,
3020                 offset=offset,
3021             )
3022         v, tail = v[:l], v[l:]
3023         first_octet = byte2int(v)
3024         if l > 1:
3025             second_octet = byte2int(v[1:])
3026             if (
3027                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3028                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3029             ):
3030                 raise DecodeError(
3031                     "non normalized integer",
3032                     klass=self.__class__,
3033                     decode_path=decode_path,
3034                     offset=offset,
3035                 )
3036         if PY2:
3037             value = 0
3038             if first_octet & 0x80 > 0:
3039                 octets = bytearray()
3040                 for octet in bytearray(v):
3041                     octets.append(octet ^ 0xFF)
3042                 for octet in octets:
3043                     value = (value << 8) | octet
3044                 value += 1
3045                 value = -value
3046             else:
3047                 for octet in bytearray(v):
3048                     value = (value << 8) | octet
3049         else:
3050             value = int.from_bytes(v, byteorder="big", signed=True)
3051         try:
3052             obj = self.__class__(
3053                 value=value,
3054                 bounds=(self._bound_min, self._bound_max),
3055                 impl=self.tag,
3056                 expl=self._expl,
3057                 default=self.default,
3058                 optional=self.optional,
3059                 _specs=self.specs,
3060                 _decoded=(offset, llen, l),
3061             )
3062         except BoundsError as err:
3063             raise DecodeError(
3064                 msg=str(err),
3065                 klass=self.__class__,
3066                 decode_path=decode_path,
3067                 offset=offset,
3068             )
3069         yield decode_path, obj, tail
3070
3071     def __repr__(self):
3072         return pp_console_row(next(self.pps()))
3073
3074     def pps(self, decode_path=()):
3075         yield _pp(
3076             obj=self,
3077             asn1_type_name=self.asn1_type_name,
3078             obj_name=self.__class__.__name__,
3079             decode_path=decode_path,
3080             value=(self.named or str(self._value)) if self.ready else None,
3081             optional=self.optional,
3082             default=self == self.default,
3083             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3084             expl=None if self._expl is None else tag_decode(self._expl),
3085             offset=self.offset,
3086             tlen=self.tlen,
3087             llen=self.llen,
3088             vlen=self.vlen,
3089             expl_offset=self.expl_offset if self.expled else None,
3090             expl_tlen=self.expl_tlen if self.expled else None,
3091             expl_llen=self.expl_llen if self.expled else None,
3092             expl_vlen=self.expl_vlen if self.expled else None,
3093             expl_lenindef=self.expl_lenindef,
3094             bered=self.bered,
3095         )
3096         for pp in self.pps_lenindef(decode_path):
3097             yield pp
3098
3099
3100 BitStringState = namedtuple(
3101     "BitStringState",
3102     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3103     **NAMEDTUPLE_KWARGS
3104 )
3105
3106
3107 class BitString(Obj):
3108     """``BIT STRING`` bit string type
3109
3110     >>> BitString(b"hello world")
3111     BIT STRING 88 bits 68656c6c6f20776f726c64
3112     >>> bytes(b)
3113     b'hello world'
3114     >>> b == b"hello world"
3115     True
3116     >>> b.bit_len
3117     88
3118
3119     >>> BitString("'0A3B5F291CD'H")
3120     BIT STRING 44 bits 0a3b5f291cd0
3121     >>> b = BitString("'010110000000'B")
3122     BIT STRING 12 bits 5800
3123     >>> b.bit_len
3124     12
3125     >>> b[0], b[1], b[2], b[3]
3126     (False, True, False, True)
3127     >>> b[1000]
3128     False
3129     >>> [v for v in b]
3130     [False, True, False, True, True, False, False, False, False, False, False, False]
3131
3132     ::
3133
3134         class KeyUsage(BitString):
3135             schema = (
3136                 ("digitalSignature", 0),
3137                 ("nonRepudiation", 1),
3138                 ("keyEncipherment", 2),
3139             )
3140
3141     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3142     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3143     >>> b.named
3144     ['nonRepudiation', 'keyEncipherment']
3145     >>> b.specs
3146     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3147
3148     .. note::
3149
3150        Pay attention that BIT STRING can be encoded both in primitive
3151        and constructed forms. Decoder always checks constructed form tag
3152        additionally to specified primitive one. If BER decoding is
3153        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3154        of DER restrictions.
3155     """
3156     __slots__ = ("tag_constructed", "specs", "defined")
3157     tag_default = tag_encode(3)
3158     asn1_type_name = "BIT STRING"
3159
3160     def __init__(
3161             self,
3162             value=None,
3163             impl=None,
3164             expl=None,
3165             default=None,
3166             optional=False,
3167             _specs=None,
3168             _decoded=(0, 0, 0),
3169     ):
3170         """
3171         :param value: set the value. Either binary type, tuple of named
3172                       values (if ``schema`` is specified in the class),
3173                       string in ``'XXX...'B`` form, or
3174                       :py:class:`pyderasn.BitString` object
3175         :param bytes impl: override default tag with ``IMPLICIT`` one
3176         :param bytes expl: override default tag with ``EXPLICIT`` one
3177         :param default: set default value. Type same as in ``value``
3178         :param bool optional: is object ``OPTIONAL`` in sequence
3179         """
3180         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3181         specs = getattr(self, "schema", {}) if _specs is None else _specs
3182         self.specs = specs if specs.__class__ == dict else dict(specs)
3183         self._value = None if value is None else self._value_sanitize(value)
3184         if default is not None:
3185             default = self._value_sanitize(default)
3186             self.default = self.__class__(
3187                 value=default,
3188                 impl=self.tag,
3189                 expl=self._expl,
3190             )
3191             if value is None:
3192                 self._value = default
3193         self.defined = None
3194         tag_klass, _, tag_num = tag_decode(self.tag)
3195         self.tag_constructed = tag_encode(
3196             klass=tag_klass,
3197             form=TagFormConstructed,
3198             num=tag_num,
3199         )
3200
3201     def _bits2octets(self, bits):
3202         if len(self.specs) > 0:
3203             bits = bits.rstrip("0")
3204         bit_len = len(bits)
3205         bits += "0" * ((8 - (bit_len % 8)) % 8)
3206         octets = bytearray(len(bits) // 8)
3207         for i in six_xrange(len(octets)):
3208             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3209         return bit_len, bytes(octets)
3210
3211     def _value_sanitize(self, value):
3212         if isinstance(value, (string_types, binary_type)):
3213             if (
3214                     isinstance(value, string_types) and
3215                     value.startswith("'")
3216             ):
3217                 if value.endswith("'B"):
3218                     value = value[1:-2]
3219                     if not frozenset(value) <= SET01:
3220                         raise ValueError("B's coding contains unacceptable chars")
3221                     return self._bits2octets(value)
3222                 if value.endswith("'H"):
3223                     value = value[1:-2]
3224                     return (
3225                         len(value) * 4,
3226                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3227                     )
3228             if value.__class__ == binary_type:
3229                 return (len(value) * 8, value)
3230             raise InvalidValueType((self.__class__, string_types, binary_type))
3231         if value.__class__ == tuple:
3232             if (
3233                     len(value) == 2 and
3234                     isinstance(value[0], integer_types) and
3235                     value[1].__class__ == binary_type
3236             ):
3237                 return value
3238             bits = []
3239             for name in value:
3240                 bit = self.specs.get(name)
3241                 if bit is None:
3242                     raise ObjUnknown("BitString value: %s" % name)
3243                 bits.append(bit)
3244             if len(bits) == 0:
3245                 return self._bits2octets("")
3246             bits = frozenset(bits)
3247             return self._bits2octets("".join(
3248                 ("1" if bit in bits else "0")
3249                 for bit in six_xrange(max(bits) + 1)
3250             ))
3251         if issubclass(value.__class__, BitString):
3252             return value._value
3253         raise InvalidValueType((self.__class__, binary_type, string_types))
3254
3255     @property
3256     def ready(self):
3257         return self._value is not None
3258
3259     def __getstate__(self):
3260         return BitStringState(
3261             __version__,
3262             self.tag,
3263             self._tag_order,
3264             self._expl,
3265             self.default,
3266             self.optional,
3267             self.offset,
3268             self.llen,
3269             self.vlen,
3270             self.expl_lenindef,
3271             self.lenindef,
3272             self.ber_encoded,
3273             self.specs,
3274             self._value,
3275             self.tag_constructed,
3276             self.defined,
3277         )
3278
3279     def __setstate__(self, state):
3280         super(BitString, self).__setstate__(state)
3281         self.specs = state.specs
3282         self._value = state.value
3283         self.tag_constructed = state.tag_constructed
3284         self.defined = state.defined
3285
3286     def __iter__(self):
3287         self._assert_ready()
3288         for i in six_xrange(self._value[0]):
3289             yield self[i]
3290
3291     @property
3292     def bit_len(self):
3293         """Returns number of bits in the string
3294         """
3295         self._assert_ready()
3296         return self._value[0]
3297
3298     def __bytes__(self):
3299         self._assert_ready()
3300         return self._value[1]
3301
3302     def __eq__(self, their):
3303         if their.__class__ == bytes:
3304             return self._value[1] == their
3305         if not issubclass(their.__class__, BitString):
3306             return False
3307         return (
3308             self._value == their._value and
3309             self.tag == their.tag and
3310             self._expl == their._expl
3311         )
3312
3313     @property
3314     def named(self):
3315         """Named representation (if exists) of the bits
3316
3317         :returns: [str(name), ...]
3318         """
3319         return [name for name, bit in iteritems(self.specs) if self[bit]]
3320
3321     def __call__(
3322             self,
3323             value=None,
3324             impl=None,
3325             expl=None,
3326             default=None,
3327             optional=None,
3328     ):
3329         return self.__class__(
3330             value=value,
3331             impl=self.tag if impl is None else impl,
3332             expl=self._expl if expl is None else expl,
3333             default=self.default if default is None else default,
3334             optional=self.optional if optional is None else optional,
3335             _specs=self.specs,
3336         )
3337
3338     def __getitem__(self, key):
3339         if key.__class__ == int:
3340             bit_len, octets = self._value
3341             if key >= bit_len:
3342                 return False
3343             return (
3344                 byte2int(memoryview(octets)[key // 8:]) >>
3345                 (7 - (key % 8))
3346             ) & 1 == 1
3347         if isinstance(key, string_types):
3348             value = self.specs.get(key)
3349             if value is None:
3350                 raise ObjUnknown("BitString value: %s" % key)
3351             return self[value]
3352         raise InvalidValueType((int, str))
3353
3354     def _encode(self):
3355         self._assert_ready()
3356         bit_len, octets = self._value
3357         return b"".join((
3358             self.tag,
3359             len_encode(len(octets) + 1),
3360             int2byte((8 - bit_len % 8) % 8),
3361             octets,
3362         ))
3363
3364     def _encode1st(self, state):
3365         self._assert_ready()
3366         _, octets = self._value
3367         l = len(octets) + 1
3368         return len(self.tag) + len_size(l) + l, state
3369
3370     def _encode2nd(self, writer, state_iter):
3371         bit_len, octets = self._value
3372         write_full(writer, b"".join((
3373             self.tag,
3374             len_encode(len(octets) + 1),
3375             int2byte((8 - bit_len % 8) % 8),
3376         )))
3377         write_full(writer, octets)
3378
3379     def _encode_cer(self, writer):
3380         bit_len, octets = self._value
3381         if len(octets) + 1 <= 1000:
3382             write_full(writer, self._encode())
3383             return
3384         write_full(writer, self.tag_constructed)
3385         write_full(writer, LENINDEF)
3386         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3387             write_full(writer, b"".join((
3388                 BitString.tag_default,
3389                 LEN1K,
3390                 int2byte(0),
3391                 octets[offset:offset + 999],
3392             )))
3393         tail = octets[offset + 999:]
3394         if len(tail) > 0:
3395             tail = int2byte((8 - bit_len % 8) % 8) + tail
3396             write_full(writer, b"".join((
3397                 BitString.tag_default,
3398                 len_encode(len(tail)),
3399                 tail,
3400             )))
3401         write_full(writer, EOC)
3402
3403     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3404         try:
3405             t, tlen, lv = tag_strip(tlv)
3406         except DecodeError as err:
3407             raise err.__class__(
3408                 msg=err.msg,
3409                 klass=self.__class__,
3410                 decode_path=decode_path,
3411                 offset=offset,
3412             )
3413         if t == self.tag:
3414             if tag_only:  # pragma: no cover
3415                 yield None
3416                 return
3417             try:
3418                 l, llen, v = len_decode(lv)
3419             except DecodeError as err:
3420                 raise err.__class__(
3421                     msg=err.msg,
3422                     klass=self.__class__,
3423                     decode_path=decode_path,
3424                     offset=offset,
3425                 )
3426             if l > len(v):
3427                 raise NotEnoughData(
3428                     "encoded length is longer than data",
3429                     klass=self.__class__,
3430                     decode_path=decode_path,
3431                     offset=offset,
3432                 )
3433             if l == 0:
3434                 raise NotEnoughData(
3435                     "zero length",
3436                     klass=self.__class__,
3437                     decode_path=decode_path,
3438                     offset=offset,
3439                 )
3440             pad_size = byte2int(v)
3441             if l == 1 and pad_size != 0:
3442                 raise DecodeError(
3443                     "invalid empty value",
3444                     klass=self.__class__,
3445                     decode_path=decode_path,
3446                     offset=offset,
3447                 )
3448             if pad_size > 7:
3449                 raise DecodeError(
3450                     "too big pad",
3451                     klass=self.__class__,
3452                     decode_path=decode_path,
3453                     offset=offset,
3454                 )
3455             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3456                 raise DecodeError(
3457                     "invalid pad",
3458                     klass=self.__class__,
3459                     decode_path=decode_path,
3460                     offset=offset,
3461                 )
3462             v, tail = v[:l], v[l:]
3463             bit_len = (len(v) - 1) * 8 - pad_size
3464             obj = self.__class__(
3465                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3466                 impl=self.tag,
3467                 expl=self._expl,
3468                 default=self.default,
3469                 optional=self.optional,
3470                 _specs=self.specs,
3471                 _decoded=(offset, llen, l),
3472             )
3473             if evgen_mode:
3474                 obj._value = (bit_len, None)
3475             yield decode_path, obj, tail
3476             return
3477         if t != self.tag_constructed:
3478             raise TagMismatch(
3479                 klass=self.__class__,
3480                 decode_path=decode_path,
3481                 offset=offset,
3482             )
3483         if not ctx.get("bered", False):
3484             raise DecodeError(
3485                 "unallowed BER constructed encoding",
3486                 klass=self.__class__,
3487                 decode_path=decode_path,
3488                 offset=offset,
3489             )
3490         if tag_only:  # pragma: no cover
3491             yield None
3492             return
3493         lenindef = False
3494         try:
3495             l, llen, v = len_decode(lv)
3496         except LenIndefForm:
3497             llen, l, v = 1, 0, lv[1:]
3498             lenindef = True
3499         except DecodeError as err:
3500             raise err.__class__(
3501                 msg=err.msg,
3502                 klass=self.__class__,
3503                 decode_path=decode_path,
3504                 offset=offset,
3505             )
3506         if l > len(v):
3507             raise NotEnoughData(
3508                 "encoded length is longer than data",
3509                 klass=self.__class__,
3510                 decode_path=decode_path,
3511                 offset=offset,
3512             )
3513         if not lenindef and l == 0:
3514             raise NotEnoughData(
3515                 "zero length",
3516                 klass=self.__class__,
3517                 decode_path=decode_path,
3518                 offset=offset,
3519             )
3520         chunks = []
3521         sub_offset = offset + tlen + llen
3522         vlen = 0
3523         while True:
3524             if lenindef:
3525                 if v[:EOC_LEN].tobytes() == EOC:
3526                     break
3527             else:
3528                 if vlen == l:
3529                     break
3530                 if vlen > l:
3531                     raise DecodeError(
3532                         "chunk out of bounds",
3533                         klass=self.__class__,
3534                         decode_path=decode_path + (str(len(chunks) - 1),),
3535                         offset=chunks[-1].offset,
3536                     )
3537             sub_decode_path = decode_path + (str(len(chunks)),)
3538             try:
3539                 if evgen_mode:
3540                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3541                             v,
3542                             offset=sub_offset,
3543                             decode_path=sub_decode_path,
3544                             leavemm=True,
3545                             ctx=ctx,
3546                             _ctx_immutable=False,
3547                     ):
3548                         yield _decode_path, chunk, v_tail
3549                 else:
3550                     _, chunk, v_tail = next(BitString().decode_evgen(
3551                         v,
3552                         offset=sub_offset,
3553                         decode_path=sub_decode_path,
3554                         leavemm=True,
3555                         ctx=ctx,
3556                         _ctx_immutable=False,
3557                         _evgen_mode=False,
3558                     ))
3559             except TagMismatch:
3560                 raise DecodeError(
3561                     "expected BitString encoded chunk",
3562                     klass=self.__class__,
3563                     decode_path=sub_decode_path,
3564                     offset=sub_offset,
3565                 )
3566             chunks.append(chunk)
3567             sub_offset += chunk.tlvlen
3568             vlen += chunk.tlvlen
3569             v = v_tail
3570         if len(chunks) == 0:
3571             raise DecodeError(
3572                 "no chunks",
3573                 klass=self.__class__,
3574                 decode_path=decode_path,
3575                 offset=offset,
3576             )
3577         values = []
3578         bit_len = 0
3579         for chunk_i, chunk in enumerate(chunks[:-1]):
3580             if chunk.bit_len % 8 != 0:
3581                 raise DecodeError(
3582                     "BitString chunk is not multiple of 8 bits",
3583                     klass=self.__class__,
3584                     decode_path=decode_path + (str(chunk_i),),
3585                     offset=chunk.offset,
3586                 )
3587             if not evgen_mode:
3588                 values.append(bytes(chunk))
3589             bit_len += chunk.bit_len
3590         chunk_last = chunks[-1]
3591         if not evgen_mode:
3592             values.append(bytes(chunk_last))
3593         bit_len += chunk_last.bit_len
3594         obj = self.__class__(
3595             value=None if evgen_mode else (bit_len, b"".join(values)),
3596             impl=self.tag,
3597             expl=self._expl,
3598             default=self.default,
3599             optional=self.optional,
3600             _specs=self.specs,
3601             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3602         )
3603         if evgen_mode:
3604             obj._value = (bit_len, None)
3605         obj.lenindef = lenindef
3606         obj.ber_encoded = True
3607         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3608
3609     def __repr__(self):
3610         return pp_console_row(next(self.pps()))
3611
3612     def pps(self, decode_path=()):
3613         value = None
3614         blob = None
3615         if self.ready:
3616             bit_len, blob = self._value
3617             value = "%d bits" % bit_len
3618             if len(self.specs) > 0 and blob is not None:
3619                 blob = tuple(self.named)
3620         yield _pp(
3621             obj=self,
3622             asn1_type_name=self.asn1_type_name,
3623             obj_name=self.__class__.__name__,
3624             decode_path=decode_path,
3625             value=value,
3626             blob=blob,
3627             optional=self.optional,
3628             default=self == self.default,
3629             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3630             expl=None if self._expl is None else tag_decode(self._expl),
3631             offset=self.offset,
3632             tlen=self.tlen,
3633             llen=self.llen,
3634             vlen=self.vlen,
3635             expl_offset=self.expl_offset if self.expled else None,
3636             expl_tlen=self.expl_tlen if self.expled else None,
3637             expl_llen=self.expl_llen if self.expled else None,
3638             expl_vlen=self.expl_vlen if self.expled else None,
3639             expl_lenindef=self.expl_lenindef,
3640             lenindef=self.lenindef,
3641             ber_encoded=self.ber_encoded,
3642             bered=self.bered,
3643         )
3644         defined_by, defined = self.defined or (None, None)
3645         if defined_by is not None:
3646             yield defined.pps(
3647                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3648             )
3649         for pp in self.pps_lenindef(decode_path):
3650             yield pp
3651
3652
3653 OctetStringState = namedtuple(
3654     "OctetStringState",
3655     BasicState._fields + (
3656         "value",
3657         "bound_min",
3658         "bound_max",
3659         "tag_constructed",
3660         "defined",
3661     ),
3662     **NAMEDTUPLE_KWARGS
3663 )
3664
3665
3666 class OctetString(Obj):
3667     """``OCTET STRING`` binary string type
3668
3669     >>> s = OctetString(b"hello world")
3670     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3671     >>> s == OctetString(b"hello world")
3672     True
3673     >>> bytes(s)
3674     b'hello world'
3675
3676     >>> OctetString(b"hello", bounds=(4, 4))
3677     Traceback (most recent call last):
3678     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3679     >>> OctetString(b"hell", bounds=(4, 4))
3680     OCTET STRING 4 bytes 68656c6c
3681
3682     Memoryviews can be used as a values. If memoryview is made on
3683     mmap-ed file, then it does not take storage inside OctetString
3684     itself. In CER encoding mode it will be streamed to the specified
3685     writer, copying 1 KB chunks.
3686     """
3687     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3688     tag_default = tag_encode(4)
3689     asn1_type_name = "OCTET STRING"
3690     evgen_mode_skip_value = True
3691
3692     def __init__(
3693             self,
3694             value=None,
3695             bounds=None,
3696             impl=None,
3697             expl=None,
3698             default=None,
3699             optional=False,
3700             _decoded=(0, 0, 0),
3701             ctx=None,
3702     ):
3703         """
3704         :param value: set the value. Either binary type, or
3705                       :py:class:`pyderasn.OctetString` object
3706         :param bounds: set ``(MIN, MAX)`` value size constraint.
3707                        (-inf, +inf) by default
3708         :param bytes impl: override default tag with ``IMPLICIT`` one
3709         :param bytes expl: override default tag with ``EXPLICIT`` one
3710         :param default: set default value. Type same as in ``value``
3711         :param bool optional: is object ``OPTIONAL`` in sequence
3712         """
3713         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3714         self._value = value
3715         self._bound_min, self._bound_max = getattr(
3716             self,
3717             "bounds",
3718             (0, float("+inf")),
3719         ) if bounds is None else bounds
3720         if value is not None:
3721             self._value = self._value_sanitize(value)
3722         if default is not None:
3723             default = self._value_sanitize(default)
3724             self.default = self.__class__(
3725                 value=default,
3726                 impl=self.tag,
3727                 expl=self._expl,
3728             )
3729             if self._value is None:
3730                 self._value = default
3731         self.defined = None
3732         tag_klass, _, tag_num = tag_decode(self.tag)
3733         self.tag_constructed = tag_encode(
3734             klass=tag_klass,
3735             form=TagFormConstructed,
3736             num=tag_num,
3737         )
3738
3739     def _value_sanitize(self, value):
3740         if value.__class__ == binary_type or value.__class__ == memoryview:
3741             pass
3742         elif issubclass(value.__class__, OctetString):
3743             value = value._value
3744         else:
3745             raise InvalidValueType((self.__class__, bytes, memoryview))
3746         if not self._bound_min <= len(value) <= self._bound_max:
3747             raise BoundsError(self._bound_min, len(value), self._bound_max)
3748         return value
3749
3750     @property
3751     def ready(self):
3752         return self._value is not None
3753
3754     def __getstate__(self):
3755         return OctetStringState(
3756             __version__,
3757             self.tag,
3758             self._tag_order,
3759             self._expl,
3760             self.default,
3761             self.optional,
3762             self.offset,
3763             self.llen,
3764             self.vlen,
3765             self.expl_lenindef,
3766             self.lenindef,
3767             self.ber_encoded,
3768             self._value,
3769             self._bound_min,
3770             self._bound_max,
3771             self.tag_constructed,
3772             self.defined,
3773         )
3774
3775     def __setstate__(self, state):
3776         super(OctetString, self).__setstate__(state)
3777         self._value = state.value
3778         self._bound_min = state.bound_min
3779         self._bound_max = state.bound_max
3780         self.tag_constructed = state.tag_constructed
3781         self.defined = state.defined
3782
3783     def __bytes__(self):
3784         self._assert_ready()
3785         return bytes(self._value)
3786
3787     def __eq__(self, their):
3788         if their.__class__ == binary_type:
3789             return self._value == their
3790         if not issubclass(their.__class__, OctetString):
3791             return False
3792         return (
3793             self._value == their._value and
3794             self.tag == their.tag and
3795             self._expl == their._expl
3796         )
3797
3798     def __lt__(self, their):
3799         return self._value < their._value
3800
3801     def __call__(
3802             self,
3803             value=None,
3804             bounds=None,
3805             impl=None,
3806             expl=None,
3807             default=None,
3808             optional=None,
3809     ):
3810         return self.__class__(
3811             value=value,
3812             bounds=(
3813                 (self._bound_min, self._bound_max)
3814                 if bounds is None else bounds
3815             ),
3816             impl=self.tag if impl is None else impl,
3817             expl=self._expl if expl is None else expl,
3818             default=self.default if default is None else default,
3819             optional=self.optional if optional is None else optional,
3820         )
3821
3822     def _encode(self):
3823         self._assert_ready()
3824         return b"".join((
3825             self.tag,
3826             len_encode(len(self._value)),
3827             self._value,
3828         ))
3829
3830     def _encode1st(self, state):
3831         self._assert_ready()
3832         l = len(self._value)
3833         return len(self.tag) + len_size(l) + l, state
3834
3835     def _encode2nd(self, writer, state_iter):
3836         value = self._value
3837         write_full(writer, self.tag + len_encode(len(value)))
3838         write_full(writer, value)
3839
3840     def _encode_cer(self, writer):
3841         octets = self._value
3842         if len(octets) <= 1000:
3843             write_full(writer, self._encode())
3844             return
3845         write_full(writer, self.tag_constructed)
3846         write_full(writer, LENINDEF)
3847         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3848             write_full(writer, b"".join((
3849                 OctetString.tag_default,
3850                 LEN1K,
3851                 octets[offset:offset + 1000],
3852             )))
3853         tail = octets[offset + 1000:]
3854         if len(tail) > 0:
3855             write_full(writer, b"".join((
3856                 OctetString.tag_default,
3857                 len_encode(len(tail)),
3858                 tail,
3859             )))
3860         write_full(writer, EOC)
3861
3862     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3863         try:
3864             t, tlen, lv = tag_strip(tlv)
3865         except DecodeError as err:
3866             raise err.__class__(
3867                 msg=err.msg,
3868                 klass=self.__class__,
3869                 decode_path=decode_path,
3870                 offset=offset,
3871             )
3872         if t == self.tag:
3873             if tag_only:
3874                 yield None
3875                 return
3876             try:
3877                 l, llen, v = len_decode(lv)
3878             except DecodeError as err:
3879                 raise err.__class__(
3880                     msg=err.msg,
3881                     klass=self.__class__,
3882                     decode_path=decode_path,
3883                     offset=offset,
3884                 )
3885             if l > len(v):
3886                 raise NotEnoughData(
3887                     "encoded length is longer than data",
3888                     klass=self.__class__,
3889                     decode_path=decode_path,
3890                     offset=offset,
3891                 )
3892             v, tail = v[:l], v[l:]
3893             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3894                 raise DecodeError(
3895                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3896                     klass=self.__class__,
3897                     decode_path=decode_path,
3898                     offset=offset,
3899                 )
3900             try:
3901                 obj = self.__class__(
3902                     value=(
3903                         None if (evgen_mode and self.evgen_mode_skip_value)
3904                         else v.tobytes()
3905                     ),
3906                     bounds=(self._bound_min, self._bound_max),
3907                     impl=self.tag,
3908                     expl=self._expl,
3909                     default=self.default,
3910                     optional=self.optional,
3911                     _decoded=(offset, llen, l),
3912                     ctx=ctx,
3913                 )
3914             except DecodeError as err:
3915                 raise DecodeError(
3916                     msg=err.msg,
3917                     klass=self.__class__,
3918                     decode_path=decode_path,
3919                     offset=offset,
3920                 )
3921             except BoundsError as err:
3922                 raise DecodeError(
3923                     msg=str(err),
3924                     klass=self.__class__,
3925                     decode_path=decode_path,
3926                     offset=offset,
3927                 )
3928             yield decode_path, obj, tail
3929             return
3930         if t != self.tag_constructed:
3931             raise TagMismatch(
3932                 klass=self.__class__,
3933                 decode_path=decode_path,
3934                 offset=offset,
3935             )
3936         if not ctx.get("bered", False):
3937             raise DecodeError(
3938                 "unallowed BER constructed encoding",
3939                 klass=self.__class__,
3940                 decode_path=decode_path,
3941                 offset=offset,
3942             )
3943         if tag_only:
3944             yield None
3945             return
3946         lenindef = False
3947         try:
3948             l, llen, v = len_decode(lv)
3949         except LenIndefForm:
3950             llen, l, v = 1, 0, lv[1:]
3951             lenindef = True
3952         except DecodeError as err:
3953             raise err.__class__(
3954                 msg=err.msg,
3955                 klass=self.__class__,
3956                 decode_path=decode_path,
3957                 offset=offset,
3958             )
3959         if l > len(v):
3960             raise NotEnoughData(
3961                 "encoded length is longer than data",
3962                 klass=self.__class__,
3963                 decode_path=decode_path,
3964                 offset=offset,
3965             )
3966         chunks = []
3967         chunks_count = 0
3968         sub_offset = offset + tlen + llen
3969         vlen = 0
3970         payload_len = 0
3971         while True:
3972             if lenindef:
3973                 if v[:EOC_LEN].tobytes() == EOC:
3974                     break
3975             else:
3976                 if vlen == l:
3977                     break
3978                 if vlen > l:
3979                     raise DecodeError(
3980                         "chunk out of bounds",
3981                         klass=self.__class__,
3982                         decode_path=decode_path + (str(len(chunks) - 1),),
3983                         offset=chunks[-1].offset,
3984                     )
3985             try:
3986                 if evgen_mode:
3987                     sub_decode_path = decode_path + (str(chunks_count),)
3988                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3989                             v,
3990                             offset=sub_offset,
3991                             decode_path=sub_decode_path,
3992                             leavemm=True,
3993                             ctx=ctx,
3994                             _ctx_immutable=False,
3995                     ):
3996                         yield _decode_path, chunk, v_tail
3997                         if not chunk.ber_encoded:
3998                             payload_len += chunk.vlen
3999                     chunks_count += 1
4000                 else:
4001                     sub_decode_path = decode_path + (str(len(chunks)),)
4002                     _, chunk, v_tail = next(OctetString().decode_evgen(
4003                         v,
4004                         offset=sub_offset,
4005                         decode_path=sub_decode_path,
4006                         leavemm=True,
4007                         ctx=ctx,
4008                         _ctx_immutable=False,
4009                         _evgen_mode=False,
4010                     ))
4011                     chunks.append(chunk)
4012             except TagMismatch:
4013                 raise DecodeError(
4014                     "expected OctetString encoded chunk",
4015                     klass=self.__class__,
4016                     decode_path=sub_decode_path,
4017                     offset=sub_offset,
4018                 )
4019             sub_offset += chunk.tlvlen
4020             vlen += chunk.tlvlen
4021             v = v_tail
4022         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4023             raise DecodeError(
4024                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4025                 klass=self.__class__,
4026                 decode_path=decode_path,
4027                 offset=offset,
4028             )
4029         try:
4030             obj = self.__class__(
4031                 value=(
4032                     None if evgen_mode else
4033                     b"".join(bytes(chunk) for chunk in chunks)
4034                 ),
4035                 bounds=(self._bound_min, self._bound_max),
4036                 impl=self.tag,
4037                 expl=self._expl,
4038                 default=self.default,
4039                 optional=self.optional,
4040                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4041                 ctx=ctx,
4042             )
4043         except DecodeError as err:
4044             raise DecodeError(
4045                 msg=err.msg,
4046                 klass=self.__class__,
4047                 decode_path=decode_path,
4048                 offset=offset,
4049             )
4050         except BoundsError as err:
4051             raise DecodeError(
4052                 msg=str(err),
4053                 klass=self.__class__,
4054                 decode_path=decode_path,
4055                 offset=offset,
4056             )
4057         obj.lenindef = lenindef
4058         obj.ber_encoded = True
4059         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4060
4061     def __repr__(self):
4062         return pp_console_row(next(self.pps()))
4063
4064     def pps(self, decode_path=()):
4065         yield _pp(
4066             obj=self,
4067             asn1_type_name=self.asn1_type_name,
4068             obj_name=self.__class__.__name__,
4069             decode_path=decode_path,
4070             value=("%d bytes" % len(self._value)) if self.ready else None,
4071             blob=self._value if self.ready else None,
4072             optional=self.optional,
4073             default=self == self.default,
4074             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4075             expl=None if self._expl is None else tag_decode(self._expl),
4076             offset=self.offset,
4077             tlen=self.tlen,
4078             llen=self.llen,
4079             vlen=self.vlen,
4080             expl_offset=self.expl_offset if self.expled else None,
4081             expl_tlen=self.expl_tlen if self.expled else None,
4082             expl_llen=self.expl_llen if self.expled else None,
4083             expl_vlen=self.expl_vlen if self.expled else None,
4084             expl_lenindef=self.expl_lenindef,
4085             lenindef=self.lenindef,
4086             ber_encoded=self.ber_encoded,
4087             bered=self.bered,
4088         )
4089         defined_by, defined = self.defined or (None, None)
4090         if defined_by is not None:
4091             yield defined.pps(
4092                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4093             )
4094         for pp in self.pps_lenindef(decode_path):
4095             yield pp
4096
4097
4098 def agg_octet_string(evgens, decode_path, raw, writer):
4099     """Aggregate constructed string (OctetString and its derivatives)
4100
4101     :param evgens: iterator of generated events
4102     :param decode_path: points to the string we want to decode
4103     :param raw: slicebable (memoryview, bytearray, etc) with
4104                 the data evgens are generated on
4105     :param writer: buffer.write where string is going to be saved
4106     :param writer: where string is going to be saved. Must comply
4107                    with ``io.RawIOBase.write`` behaviour
4108
4109     .. seealso:: :ref:`agg_octet_string`
4110     """
4111     decode_path_len = len(decode_path)
4112     for dp, obj, _ in evgens:
4113         if dp[:decode_path_len] != decode_path:
4114             continue
4115         if not obj.ber_encoded:
4116             write_full(writer, raw[
4117                 obj.offset + obj.tlen + obj.llen:
4118                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4119                 (EOC_LEN if obj.expl_lenindef else 0)
4120             ])
4121         if len(dp) == decode_path_len:
4122             break
4123
4124
4125 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4126
4127
4128 class Null(Obj):
4129     """``NULL`` null object
4130
4131     >>> n = Null()
4132     NULL
4133     >>> n.ready
4134     True
4135     """
4136     __slots__ = ()
4137     tag_default = tag_encode(5)
4138     asn1_type_name = "NULL"
4139
4140     def __init__(
4141             self,
4142             value=None,  # unused, but Sequence passes it
4143             impl=None,
4144             expl=None,
4145             optional=False,
4146             _decoded=(0, 0, 0),
4147     ):
4148         """
4149         :param bytes impl: override default tag with ``IMPLICIT`` one
4150         :param bytes expl: override default tag with ``EXPLICIT`` one
4151         :param bool optional: is object ``OPTIONAL`` in sequence
4152         """
4153         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4154         self.default = None
4155
4156     @property
4157     def ready(self):
4158         return True
4159
4160     def __getstate__(self):
4161         return NullState(
4162             __version__,
4163             self.tag,
4164             self._tag_order,
4165             self._expl,
4166             self.default,
4167             self.optional,
4168             self.offset,
4169             self.llen,
4170             self.vlen,
4171             self.expl_lenindef,
4172             self.lenindef,
4173             self.ber_encoded,
4174         )
4175
4176     def __eq__(self, their):
4177         if not issubclass(their.__class__, Null):
4178             return False
4179         return (
4180             self.tag == their.tag and
4181             self._expl == their._expl
4182         )
4183
4184     def __call__(
4185             self,
4186             value=None,
4187             impl=None,
4188             expl=None,
4189             optional=None,
4190     ):
4191         return self.__class__(
4192             impl=self.tag if impl is None else impl,
4193             expl=self._expl if expl is None else expl,
4194             optional=self.optional if optional is None else optional,
4195         )
4196
4197     def _encode(self):
4198         return self.tag + LEN0
4199
4200     def _encode1st(self, state):
4201         return len(self.tag) + 1, state
4202
4203     def _encode2nd(self, writer, state_iter):
4204         write_full(writer, self.tag + LEN0)
4205
4206     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4207         try:
4208             t, _, lv = tag_strip(tlv)
4209         except DecodeError as err:
4210             raise err.__class__(
4211                 msg=err.msg,
4212                 klass=self.__class__,
4213                 decode_path=decode_path,
4214                 offset=offset,
4215             )
4216         if t != self.tag:
4217             raise TagMismatch(
4218                 klass=self.__class__,
4219                 decode_path=decode_path,
4220                 offset=offset,
4221             )
4222         if tag_only:  # pragma: no cover
4223             yield None
4224             return
4225         try:
4226             l, _, v = len_decode(lv)
4227         except DecodeError as err:
4228             raise err.__class__(
4229                 msg=err.msg,
4230                 klass=self.__class__,
4231                 decode_path=decode_path,
4232                 offset=offset,
4233             )
4234         if l != 0:
4235             raise InvalidLength(
4236                 "Null must have zero length",
4237                 klass=self.__class__,
4238                 decode_path=decode_path,
4239                 offset=offset,
4240             )
4241         obj = self.__class__(
4242             impl=self.tag,
4243             expl=self._expl,
4244             optional=self.optional,
4245             _decoded=(offset, 1, 0),
4246         )
4247         yield decode_path, obj, v
4248
4249     def __repr__(self):
4250         return pp_console_row(next(self.pps()))
4251
4252     def pps(self, decode_path=()):
4253         yield _pp(
4254             obj=self,
4255             asn1_type_name=self.asn1_type_name,
4256             obj_name=self.__class__.__name__,
4257             decode_path=decode_path,
4258             optional=self.optional,
4259             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4260             expl=None if self._expl is None else tag_decode(self._expl),
4261             offset=self.offset,
4262             tlen=self.tlen,
4263             llen=self.llen,
4264             vlen=self.vlen,
4265             expl_offset=self.expl_offset if self.expled else None,
4266             expl_tlen=self.expl_tlen if self.expled else None,
4267             expl_llen=self.expl_llen if self.expled else None,
4268             expl_vlen=self.expl_vlen if self.expled else None,
4269             expl_lenindef=self.expl_lenindef,
4270             bered=self.bered,
4271         )
4272         for pp in self.pps_lenindef(decode_path):
4273             yield pp
4274
4275
4276 ObjectIdentifierState = namedtuple(
4277     "ObjectIdentifierState",
4278     BasicState._fields + ("value", "defines"),
4279     **NAMEDTUPLE_KWARGS
4280 )
4281
4282
4283 class ObjectIdentifier(Obj):
4284     """``OBJECT IDENTIFIER`` OID type
4285
4286     >>> oid = ObjectIdentifier((1, 2, 3))
4287     OBJECT IDENTIFIER 1.2.3
4288     >>> oid == ObjectIdentifier("1.2.3")
4289     True
4290     >>> tuple(oid)
4291     (1, 2, 3)
4292     >>> str(oid)
4293     '1.2.3'
4294     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4295     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4296
4297     >>> str(ObjectIdentifier((3, 1)))
4298     Traceback (most recent call last):
4299     pyderasn.InvalidOID: unacceptable first arc value
4300     """
4301     __slots__ = ("defines",)
4302     tag_default = tag_encode(6)
4303     asn1_type_name = "OBJECT IDENTIFIER"
4304
4305     def __init__(
4306             self,
4307             value=None,
4308             defines=(),
4309             impl=None,
4310             expl=None,
4311             default=None,
4312             optional=False,
4313             _decoded=(0, 0, 0),
4314     ):
4315         """
4316         :param value: set the value. Either tuples of integers,
4317                       string of "."-concatenated integers, or
4318                       :py:class:`pyderasn.ObjectIdentifier` object
4319         :param defines: sequence of tuples. Each tuple has two elements.
4320                         First one is relative to current one decode
4321                         path, aiming to the field defined by that OID.
4322                         Read about relative path in
4323                         :py:func:`pyderasn.abs_decode_path`. Second
4324                         tuple element is ``{OID: pyderasn.Obj()}``
4325                         dictionary, mapping between current OID value
4326                         and structure applied to defined field.
4327
4328                         .. seealso:: :ref:`definedby`
4329
4330         :param bytes impl: override default tag with ``IMPLICIT`` one
4331         :param bytes expl: override default tag with ``EXPLICIT`` one
4332         :param default: set default value. Type same as in ``value``
4333         :param bool optional: is object ``OPTIONAL`` in sequence
4334         """
4335         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4336         self._value = value
4337         if value is not None:
4338             self._value = self._value_sanitize(value)
4339         if default is not None:
4340             default = self._value_sanitize(default)
4341             self.default = self.__class__(
4342                 value=default,
4343                 impl=self.tag,
4344                 expl=self._expl,
4345             )
4346             if self._value is None:
4347                 self._value = default
4348         self.defines = defines
4349
4350     def __add__(self, their):
4351         if their.__class__ == tuple:
4352             return self.__class__(self._value + array("L", their))
4353         if isinstance(their, self.__class__):
4354             return self.__class__(self._value + their._value)
4355         raise InvalidValueType((self.__class__, tuple))
4356
4357     def _value_sanitize(self, value):
4358         if issubclass(value.__class__, ObjectIdentifier):
4359             return value._value
4360         if isinstance(value, string_types):
4361             try:
4362                 value = array("L", (pureint(arc) for arc in value.split(".")))
4363             except ValueError:
4364                 raise InvalidOID("unacceptable arcs values")
4365         if value.__class__ == tuple:
4366             try:
4367                 value = array("L", value)
4368             except OverflowError as err:
4369                 raise InvalidOID(repr(err))
4370         if value.__class__ is array:
4371             if len(value) < 2:
4372                 raise InvalidOID("less than 2 arcs")
4373             first_arc = value[0]
4374             if first_arc in (0, 1):
4375                 if not (0 <= value[1] <= 39):
4376                     raise InvalidOID("second arc is too wide")
4377             elif first_arc == 2:
4378                 pass
4379             else:
4380                 raise InvalidOID("unacceptable first arc value")
4381             if not all(arc >= 0 for arc in value):
4382                 raise InvalidOID("negative arc value")
4383             return value
4384         raise InvalidValueType((self.__class__, str, tuple))
4385
4386     @property
4387     def ready(self):
4388         return self._value is not None
4389
4390     def __getstate__(self):
4391         return ObjectIdentifierState(
4392             __version__,
4393             self.tag,
4394             self._tag_order,
4395             self._expl,
4396             self.default,
4397             self.optional,
4398             self.offset,
4399             self.llen,
4400             self.vlen,
4401             self.expl_lenindef,
4402             self.lenindef,
4403             self.ber_encoded,
4404             self._value,
4405             self.defines,
4406         )
4407
4408     def __setstate__(self, state):
4409         super(ObjectIdentifier, self).__setstate__(state)
4410         self._value = state.value
4411         self.defines = state.defines
4412
4413     def __iter__(self):
4414         self._assert_ready()
4415         return iter(self._value)
4416
4417     def __str__(self):
4418         return ".".join(str(arc) for arc in self._value or ())
4419
4420     def __hash__(self):
4421         self._assert_ready()
4422         return hash(b"".join((
4423             self.tag,
4424             bytes(self._expl or b""),
4425             str(self._value).encode("ascii"),
4426         )))
4427
4428     def __eq__(self, their):
4429         if their.__class__ == tuple:
4430             return self._value == array("L", their)
4431         if not issubclass(their.__class__, ObjectIdentifier):
4432             return False
4433         return (
4434             self.tag == their.tag and
4435             self._expl == their._expl and
4436             self._value == their._value
4437         )
4438
4439     def __lt__(self, their):
4440         return self._value < their._value
4441
4442     def __call__(
4443             self,
4444             value=None,
4445             defines=None,
4446             impl=None,
4447             expl=None,
4448             default=None,
4449             optional=None,
4450     ):
4451         return self.__class__(
4452             value=value,
4453             defines=self.defines if defines is None else defines,
4454             impl=self.tag if impl is None else impl,
4455             expl=self._expl if expl is None else expl,
4456             default=self.default if default is None else default,
4457             optional=self.optional if optional is None else optional,
4458         )
4459
4460     def _encode_octets(self):
4461         self._assert_ready()
4462         value = self._value
4463         first_value = value[1]
4464         first_arc = value[0]
4465         if first_arc == 0:
4466             pass
4467         elif first_arc == 1:
4468             first_value += 40
4469         elif first_arc == 2:
4470             first_value += 80
4471         else:  # pragma: no cover
4472             raise RuntimeError("invalid arc is stored")
4473         octets = [zero_ended_encode(first_value)]
4474         for arc in value[2:]:
4475             octets.append(zero_ended_encode(arc))
4476         return b"".join(octets)
4477
4478     def _encode(self):
4479         v = self._encode_octets()
4480         return b"".join((self.tag, len_encode(len(v)), v))
4481
4482     def _encode1st(self, state):
4483         l = len(self._encode_octets())
4484         return len(self.tag) + len_size(l) + l, state
4485
4486     def _encode2nd(self, writer, state_iter):
4487         write_full(writer, self._encode())
4488
4489     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4490         try:
4491             t, _, lv = tag_strip(tlv)
4492         except DecodeError as err:
4493             raise err.__class__(
4494                 msg=err.msg,
4495                 klass=self.__class__,
4496                 decode_path=decode_path,
4497                 offset=offset,
4498             )
4499         if t != self.tag:
4500             raise TagMismatch(
4501                 klass=self.__class__,
4502                 decode_path=decode_path,
4503                 offset=offset,
4504             )
4505         if tag_only:  # pragma: no cover
4506             yield None
4507             return
4508         try:
4509             l, llen, v = len_decode(lv)
4510         except DecodeError as err:
4511             raise err.__class__(
4512                 msg=err.msg,
4513                 klass=self.__class__,
4514                 decode_path=decode_path,
4515                 offset=offset,
4516             )
4517         if l > len(v):
4518             raise NotEnoughData(
4519                 "encoded length is longer than data",
4520                 klass=self.__class__,
4521                 decode_path=decode_path,
4522                 offset=offset,
4523             )
4524         if l == 0:
4525             raise NotEnoughData(
4526                 "zero length",
4527                 klass=self.__class__,
4528                 decode_path=decode_path,
4529                 offset=offset,
4530             )
4531         v, tail = v[:l], v[l:]
4532         arcs = array("L")
4533         ber_encoded = False
4534         while len(v) > 0:
4535             i = 0
4536             arc = 0
4537             while True:
4538                 octet = indexbytes(v, i)
4539                 if i == 0 and octet == 0x80:
4540                     if ctx.get("bered", False):
4541                         ber_encoded = True
4542                     else:
4543                         raise DecodeError(
4544                             "non normalized arc encoding",
4545                             klass=self.__class__,
4546                             decode_path=decode_path,
4547                             offset=offset,
4548                         )
4549                 arc = (arc << 7) | (octet & 0x7F)
4550                 if octet & 0x80 == 0:
4551                     try:
4552                         arcs.append(arc)
4553                     except OverflowError:
4554                         raise DecodeError(
4555                             "too huge value for local unsigned long",
4556                             klass=self.__class__,
4557                             decode_path=decode_path,
4558                             offset=offset,
4559                         )
4560                     v = v[i + 1:]
4561                     break
4562                 i += 1
4563                 if i == len(v):
4564                     raise DecodeError(
4565                         "unfinished OID",
4566                         klass=self.__class__,
4567                         decode_path=decode_path,
4568                         offset=offset,
4569                     )
4570         first_arc = 0
4571         second_arc = arcs[0]
4572         if 0 <= second_arc <= 39:
4573             first_arc = 0
4574         elif 40 <= second_arc <= 79:
4575             first_arc = 1
4576             second_arc -= 40
4577         else:
4578             first_arc = 2
4579             second_arc -= 80
4580         obj = self.__class__(
4581             value=array("L", (first_arc, second_arc)) + arcs[1:],
4582             impl=self.tag,
4583             expl=self._expl,
4584             default=self.default,
4585             optional=self.optional,
4586             _decoded=(offset, llen, l),
4587         )
4588         if ber_encoded:
4589             obj.ber_encoded = True
4590         yield decode_path, obj, tail
4591
4592     def __repr__(self):
4593         return pp_console_row(next(self.pps()))
4594
4595     def pps(self, decode_path=()):
4596         yield _pp(
4597             obj=self,
4598             asn1_type_name=self.asn1_type_name,
4599             obj_name=self.__class__.__name__,
4600             decode_path=decode_path,
4601             value=str(self) if self.ready else None,
4602             optional=self.optional,
4603             default=self == self.default,
4604             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4605             expl=None if self._expl is None else tag_decode(self._expl),
4606             offset=self.offset,
4607             tlen=self.tlen,
4608             llen=self.llen,
4609             vlen=self.vlen,
4610             expl_offset=self.expl_offset if self.expled else None,
4611             expl_tlen=self.expl_tlen if self.expled else None,
4612             expl_llen=self.expl_llen if self.expled else None,
4613             expl_vlen=self.expl_vlen if self.expled else None,
4614             expl_lenindef=self.expl_lenindef,
4615             ber_encoded=self.ber_encoded,
4616             bered=self.bered,
4617         )
4618         for pp in self.pps_lenindef(decode_path):
4619             yield pp
4620
4621
4622 class Enumerated(Integer):
4623     """``ENUMERATED`` integer type
4624
4625     This type is identical to :py:class:`pyderasn.Integer`, but requires
4626     schema to be specified and does not accept values missing from it.
4627     """
4628     __slots__ = ()
4629     tag_default = tag_encode(10)
4630     asn1_type_name = "ENUMERATED"
4631
4632     def __init__(
4633             self,
4634             value=None,
4635             impl=None,
4636             expl=None,
4637             default=None,
4638             optional=False,
4639             _specs=None,
4640             _decoded=(0, 0, 0),
4641             bounds=None,  # dummy argument, workability for Integer.decode
4642     ):
4643         super(Enumerated, self).__init__(
4644             value, bounds, impl, expl, default, optional, _specs, _decoded,
4645         )
4646         if len(self.specs) == 0:
4647             raise ValueError("schema must be specified")
4648
4649     def _value_sanitize(self, value):
4650         if isinstance(value, self.__class__):
4651             value = value._value
4652         elif isinstance(value, integer_types):
4653             for _value in itervalues(self.specs):
4654                 if _value == value:
4655                     break
4656             else:
4657                 raise DecodeError(
4658                     "unknown integer value: %s" % value,
4659                     klass=self.__class__,
4660                 )
4661         elif isinstance(value, string_types):
4662             value = self.specs.get(value)
4663             if value is None:
4664                 raise ObjUnknown("integer value: %s" % value)
4665         else:
4666             raise InvalidValueType((self.__class__, int, str))
4667         return value
4668
4669     def __call__(
4670             self,
4671             value=None,
4672             impl=None,
4673             expl=None,
4674             default=None,
4675             optional=None,
4676             _specs=None,
4677     ):
4678         return self.__class__(
4679             value=value,
4680             impl=self.tag if impl is None else impl,
4681             expl=self._expl if expl is None else expl,
4682             default=self.default if default is None else default,
4683             optional=self.optional if optional is None else optional,
4684             _specs=self.specs,
4685         )
4686
4687
4688 def escape_control_unicode(c):
4689     if unicat(c)[0] == "C":
4690         c = repr(c).lstrip("u").strip("'")
4691     return c
4692
4693
4694 class CommonString(OctetString):
4695     """Common class for all strings
4696
4697     Everything resembles :py:class:`pyderasn.OctetString`, except
4698     ability to deal with unicode text strings.
4699
4700     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4701     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4702     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4703     True
4704     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4705     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4706     >>> str(s)
4707     'привет Ð¼Ð¸Ñ€'
4708     >>> hexenc(bytes(s))
4709     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4710
4711     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4712     Traceback (most recent call last):
4713     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4714
4715     >>> BMPString("ада", bounds=(2, 2))
4716     Traceback (most recent call last):
4717     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4718     >>> s = BMPString("ад", bounds=(2, 2))
4719     >>> s.encoding
4720     'utf-16-be'
4721     >>> hexenc(bytes(s))
4722     '04300434'
4723
4724     .. list-table::
4725        :header-rows: 1
4726
4727        * - Class
4728          - Text Encoding, validation
4729        * - :py:class:`pyderasn.UTF8String`
4730          - utf-8
4731        * - :py:class:`pyderasn.NumericString`
4732          - proper alphabet validation
4733        * - :py:class:`pyderasn.PrintableString`
4734          - proper alphabet validation
4735        * - :py:class:`pyderasn.TeletexString`
4736          - iso-8859-1
4737        * - :py:class:`pyderasn.T61String`
4738          - iso-8859-1
4739        * - :py:class:`pyderasn.VideotexString`
4740          - iso-8859-1
4741        * - :py:class:`pyderasn.IA5String`
4742          - proper alphabet validation
4743        * - :py:class:`pyderasn.GraphicString`
4744          - iso-8859-1
4745        * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4746          - proper alphabet validation
4747        * - :py:class:`pyderasn.GeneralString`
4748          - iso-8859-1
4749        * - :py:class:`pyderasn.UniversalString`
4750          - utf-32-be
4751        * - :py:class:`pyderasn.BMPString`
4752          - utf-16-be
4753     """
4754     __slots__ = ()
4755
4756     def _value_sanitize(self, value):
4757         value_raw = None
4758         value_decoded = None
4759         if isinstance(value, self.__class__):
4760             value_raw = value._value
4761         elif value.__class__ == text_type:
4762             value_decoded = value
4763         elif value.__class__ == binary_type:
4764             value_raw = value
4765         else:
4766             raise InvalidValueType((self.__class__, text_type, binary_type))
4767         try:
4768             value_raw = (
4769                 value_decoded.encode(self.encoding)
4770                 if value_raw is None else value_raw
4771             )
4772             value_decoded = (
4773                 value_raw.decode(self.encoding)
4774                 if value_decoded is None else value_decoded
4775             )
4776         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4777             raise DecodeError(str(err))
4778         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4779             raise BoundsError(
4780                 self._bound_min,
4781                 len(value_decoded),
4782                 self._bound_max,
4783             )
4784         return value_raw
4785
4786     def __eq__(self, their):
4787         if their.__class__ == binary_type:
4788             return self._value == their
4789         if their.__class__ == text_type:
4790             return self._value == their.encode(self.encoding)
4791         if not isinstance(their, self.__class__):
4792             return False
4793         return (
4794             self._value == their._value and
4795             self.tag == their.tag and
4796             self._expl == their._expl
4797         )
4798
4799     def __unicode__(self):
4800         if self.ready:
4801             return self._value.decode(self.encoding)
4802         return text_type(self._value)
4803
4804     def __repr__(self):
4805         return pp_console_row(next(self.pps(no_unicode=PY2)))
4806
4807     def pps(self, decode_path=(), no_unicode=False):
4808         value = None
4809         if self.ready:
4810             value = (
4811                 hexenc(bytes(self)) if no_unicode else
4812                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4813             )
4814         yield _pp(
4815             obj=self,
4816             asn1_type_name=self.asn1_type_name,
4817             obj_name=self.__class__.__name__,
4818             decode_path=decode_path,
4819             value=value,
4820             optional=self.optional,
4821             default=self == self.default,
4822             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4823             expl=None if self._expl is None else tag_decode(self._expl),
4824             offset=self.offset,
4825             tlen=self.tlen,
4826             llen=self.llen,
4827             vlen=self.vlen,
4828             expl_offset=self.expl_offset if self.expled else None,
4829             expl_tlen=self.expl_tlen if self.expled else None,
4830             expl_llen=self.expl_llen if self.expled else None,
4831             expl_vlen=self.expl_vlen if self.expled else None,
4832             expl_lenindef=self.expl_lenindef,
4833             ber_encoded=self.ber_encoded,
4834             bered=self.bered,
4835         )
4836         for pp in self.pps_lenindef(decode_path):
4837             yield pp
4838
4839
4840 class UTF8String(CommonString):
4841     __slots__ = ()
4842     tag_default = tag_encode(12)
4843     encoding = "utf-8"
4844     asn1_type_name = "UTF8String"
4845
4846
4847 class AllowableCharsMixin(object):
4848     @property
4849     def allowable_chars(self):
4850         if PY2:
4851             return self._allowable_chars
4852         return frozenset(six_unichr(c) for c in self._allowable_chars)
4853
4854     def _value_sanitize(self, value):
4855         value = super(AllowableCharsMixin, self)._value_sanitize(value)
4856         if not frozenset(value) <= self._allowable_chars:
4857             raise DecodeError("non satisfying alphabet value")
4858         return value
4859
4860
4861 class NumericString(AllowableCharsMixin, CommonString):
4862     """Numeric string
4863
4864     Its value is properly sanitized: only ASCII digits with spaces can
4865     be stored.
4866
4867     >>> NumericString().allowable_chars
4868     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4869     """
4870     __slots__ = ()
4871     tag_default = tag_encode(18)
4872     encoding = "ascii"
4873     asn1_type_name = "NumericString"
4874     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4875
4876
4877 PrintableStringState = namedtuple(
4878     "PrintableStringState",
4879     OctetStringState._fields + ("allowable_chars",),
4880     **NAMEDTUPLE_KWARGS
4881 )
4882
4883
4884 class PrintableString(AllowableCharsMixin, CommonString):
4885     """Printable string
4886
4887     Its value is properly sanitized: see X.680 41.4 table 10.
4888
4889     >>> PrintableString().allowable_chars
4890     frozenset([' ', "'", ..., 'z'])
4891     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4892     PrintableString PrintableString foo*bar
4893     >>> obj.allow_asterisk, obj.allow_ampersand
4894     (True, False)
4895     """
4896     __slots__ = ()
4897     tag_default = tag_encode(19)
4898     encoding = "ascii"
4899     asn1_type_name = "PrintableString"
4900     _allowable_chars = frozenset(
4901         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4902     )
4903     _asterisk = frozenset("*".encode("ascii"))
4904     _ampersand = frozenset("&".encode("ascii"))
4905
4906     def __init__(
4907             self,
4908             value=None,
4909             bounds=None,
4910             impl=None,
4911             expl=None,
4912             default=None,
4913             optional=False,
4914             _decoded=(0, 0, 0),
4915             ctx=None,
4916             allow_asterisk=False,
4917             allow_ampersand=False,
4918     ):
4919         """
4920         :param allow_asterisk: allow asterisk character
4921         :param allow_ampersand: allow ampersand character
4922         """
4923         if allow_asterisk:
4924             self._allowable_chars |= self._asterisk
4925         if allow_ampersand:
4926             self._allowable_chars |= self._ampersand
4927         super(PrintableString, self).__init__(
4928             value, bounds, impl, expl, default, optional, _decoded, ctx,
4929         )
4930
4931     @property
4932     def allow_asterisk(self):
4933         """Is asterisk character allowed?
4934         """
4935         return self._asterisk <= self._allowable_chars
4936
4937     @property
4938     def allow_ampersand(self):
4939         """Is ampersand character allowed?
4940         """
4941         return self._ampersand <= self._allowable_chars
4942
4943     def __getstate__(self):
4944         return PrintableStringState(
4945             *super(PrintableString, self).__getstate__(),
4946             **{"allowable_chars": self._allowable_chars}
4947         )
4948
4949     def __setstate__(self, state):
4950         super(PrintableString, self).__setstate__(state)
4951         self._allowable_chars = state.allowable_chars
4952
4953     def __call__(
4954             self,
4955             value=None,
4956             bounds=None,
4957             impl=None,
4958             expl=None,
4959             default=None,
4960             optional=None,
4961     ):
4962         return self.__class__(
4963             value=value,
4964             bounds=(
4965                 (self._bound_min, self._bound_max)
4966                 if bounds is None else bounds
4967             ),
4968             impl=self.tag if impl is None else impl,
4969             expl=self._expl if expl is None else expl,
4970             default=self.default if default is None else default,
4971             optional=self.optional if optional is None else optional,
4972             allow_asterisk=self.allow_asterisk,
4973             allow_ampersand=self.allow_ampersand,
4974         )
4975
4976
4977 class TeletexString(CommonString):
4978     __slots__ = ()
4979     tag_default = tag_encode(20)
4980     encoding = "iso-8859-1"
4981     asn1_type_name = "TeletexString"
4982
4983
4984 class T61String(TeletexString):
4985     __slots__ = ()
4986     asn1_type_name = "T61String"
4987
4988
4989 class VideotexString(CommonString):
4990     __slots__ = ()
4991     tag_default = tag_encode(21)
4992     encoding = "iso-8859-1"
4993     asn1_type_name = "VideotexString"
4994
4995
4996 class IA5String(AllowableCharsMixin, CommonString):
4997     """IA5 string
4998
4999     Its value is properly sanitized: it is a mix of
5000
5001     * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
5002     * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
5003     * DEL character (0x7F)
5004
5005     It is just 7-bit ASCII.
5006
5007     >>> IA5String().allowable_chars
5008     frozenset(["NUL", ... "DEL"])
5009     """
5010     __slots__ = ()
5011     tag_default = tag_encode(22)
5012     encoding = "ascii"
5013     asn1_type_name = "IA5"
5014     _allowable_chars = frozenset(b"".join(
5015         six_unichr(c).encode("ascii") for c in six_xrange(128)
5016     ))
5017
5018
5019 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5020 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5021 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5022 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5023 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5024 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5025
5026
5027 class VisibleString(AllowableCharsMixin, CommonString):
5028     """Visible string
5029
5030     Its value is properly sanitized. ASCII subset from space to tilde is
5031     allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5032
5033     >>> VisibleString().allowable_chars
5034     frozenset([" ", ... "~"])
5035     """
5036     __slots__ = ()
5037     tag_default = tag_encode(26)
5038     encoding = "ascii"
5039     asn1_type_name = "VisibleString"
5040     _allowable_chars = frozenset(b"".join(
5041         six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5042     ))
5043
5044
5045 class ISO646String(VisibleString):
5046     __slots__ = ()
5047     asn1_type_name = "ISO646String"
5048
5049
5050 UTCTimeState = namedtuple(
5051     "UTCTimeState",
5052     OctetStringState._fields + ("ber_raw",),
5053     **NAMEDTUPLE_KWARGS
5054 )
5055
5056
5057 def str_to_time_fractions(value):
5058     v = pureint(value)
5059     year, v = (v // 10**10), (v % 10**10)
5060     month, v = (v // 10**8), (v % 10**8)
5061     day, v = (v // 10**6), (v % 10**6)
5062     hour, v = (v // 10**4), (v % 10**4)
5063     minute, second = (v // 100), (v % 100)
5064     return year, month, day, hour, minute, second
5065
5066
5067 class UTCTime(VisibleString):
5068     """``UTCTime`` datetime type
5069
5070     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5071     UTCTime UTCTime 2017-09-30T22:07:50
5072     >>> str(t)
5073     '170930220750Z'
5074     >>> bytes(t)
5075     b'170930220750Z'
5076     >>> t.todatetime()
5077     datetime.datetime(2017, 9, 30, 22, 7, 50)
5078     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5079     datetime.datetime(1957, 9, 30, 22, 7, 50)
5080
5081     If BER encoded value was met, then ``ber_raw`` attribute will hold
5082     its raw representation.
5083
5084     .. warning::
5085
5086        Only **naive** ``datetime`` objects are supported.
5087        Library assumes that all work is done in UTC.
5088
5089     .. warning::
5090
5091        Pay attention that ``UTCTime`` can not hold full year, so all years
5092        having < 50 years are treated as 20xx, 19xx otherwise, according to
5093        X.509 recommendation. Use ``GeneralizedTime`` instead for
5094        removing ambiguity.
5095
5096     .. warning::
5097
5098        No strict validation of UTC offsets are made (only applicable to
5099        **BER**), but very crude:
5100
5101        * minutes are not exceeding 60
5102        * offset value is not exceeding 14 hours
5103     """
5104     __slots__ = ("ber_raw",)
5105     tag_default = tag_encode(23)
5106     encoding = "ascii"
5107     asn1_type_name = "UTCTime"
5108     evgen_mode_skip_value = False
5109
5110     def __init__(
5111             self,
5112             value=None,
5113             impl=None,
5114             expl=None,
5115             default=None,
5116             optional=False,
5117             _decoded=(0, 0, 0),
5118             bounds=None,  # dummy argument, workability for OctetString.decode
5119             ctx=None,
5120     ):
5121         """
5122         :param value: set the value. Either datetime type, or
5123                       :py:class:`pyderasn.UTCTime` object
5124         :param bytes impl: override default tag with ``IMPLICIT`` one
5125         :param bytes expl: override default tag with ``EXPLICIT`` one
5126         :param default: set default value. Type same as in ``value``
5127         :param bool optional: is object ``OPTIONAL`` in sequence
5128         """
5129         super(UTCTime, self).__init__(
5130             None, None, impl, expl, None, optional, _decoded, ctx,
5131         )
5132         self._value = value
5133         self.ber_raw = None
5134         if value is not None:
5135             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5136             self.ber_encoded = self.ber_raw is not None
5137         if default is not None:
5138             default, _ = self._value_sanitize(default)
5139             self.default = self.__class__(
5140                 value=default,
5141                 impl=self.tag,
5142                 expl=self._expl,
5143             )
5144             if self._value is None:
5145                 self._value = default
5146             optional = True
5147         self.optional = optional
5148
5149     def _strptime_bered(self, value):
5150         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5151         value = value[10:]
5152         if len(value) == 0:
5153             raise ValueError("no timezone")
5154         year += 2000 if year < 50 else 1900
5155         decoded = datetime(year, month, day, hour, minute)
5156         offset = 0
5157         if value[-1] == "Z":
5158             value = value[:-1]
5159         else:
5160             if len(value) < 5:
5161                 raise ValueError("invalid UTC offset")
5162             if value[-5] == "-":
5163                 sign = -1
5164             elif value[-5] == "+":
5165                 sign = 1
5166             else:
5167                 raise ValueError("invalid UTC offset")
5168             v = pureint(value[-4:])
5169             offset, v = (60 * (v % 100)), v // 100
5170             if offset >= 3600:
5171                 raise ValueError("invalid UTC offset minutes")
5172             offset += 3600 * v
5173             if offset > 14 * 3600:
5174                 raise ValueError("too big UTC offset")
5175             offset *= sign
5176             value = value[:-5]
5177         if len(value) == 0:
5178             return offset, decoded
5179         if len(value) != 2:
5180             raise ValueError("invalid UTC offset seconds")
5181         seconds = pureint(value)
5182         if seconds >= 60:
5183             raise ValueError("invalid seconds value")
5184         return offset, decoded + timedelta(seconds=seconds)
5185
5186     def _strptime(self, value):
5187         # datetime.strptime's format: %y%m%d%H%M%SZ
5188         if len(value) != LEN_YYMMDDHHMMSSZ:
5189             raise ValueError("invalid UTCTime length")
5190         if value[-1] != "Z":
5191             raise ValueError("non UTC timezone")
5192         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5193         year += 2000 if year < 50 else 1900
5194         return datetime(year, month, day, hour, minute, second)
5195
5196     def _dt_sanitize(self, value):
5197         if value.year < 1950 or value.year > 2049:
5198             raise ValueError("UTCTime can hold only 1950-2049 years")
5199         return value.replace(microsecond=0)
5200
5201     def _value_sanitize(self, value, ctx=None):
5202         if value.__class__ == binary_type:
5203             try:
5204                 value_decoded = value.decode("ascii")
5205             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5206                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5207             err = None
5208             try:
5209                 return self._strptime(value_decoded), None
5210             except (TypeError, ValueError) as _err:
5211                 err = _err
5212                 if (ctx is not None) and ctx.get("bered", False):
5213                     try:
5214                         offset, _value = self._strptime_bered(value_decoded)
5215                         _value = _value - timedelta(seconds=offset)
5216                         return self._dt_sanitize(_value), value
5217                     except (TypeError, ValueError, OverflowError) as _err:
5218                         err = _err
5219             raise DecodeError(
5220                 "invalid %s format: %r" % (self.asn1_type_name, err),
5221                 klass=self.__class__,
5222             )
5223         if isinstance(value, self.__class__):
5224             return value._value, None
5225         if value.__class__ == datetime:
5226             if value.tzinfo is not None:
5227                 raise ValueError("only naive datetime supported")
5228             return self._dt_sanitize(value), None
5229         raise InvalidValueType((self.__class__, datetime))
5230
5231     def _pp_value(self):
5232         if self.ready:
5233             value = self._value.isoformat()
5234             if self.ber_encoded:
5235                 value += " (%s)" % self.ber_raw
5236             return value
5237         return None
5238
5239     def __unicode__(self):
5240         if self.ready:
5241             value = self._value.isoformat()
5242             if self.ber_encoded:
5243                 value += " (%s)" % self.ber_raw
5244             return value
5245         return text_type(self._pp_value())
5246
5247     def __getstate__(self):
5248         return UTCTimeState(
5249             *super(UTCTime, self).__getstate__(),
5250             **{"ber_raw": self.ber_raw}
5251         )
5252
5253     def __setstate__(self, state):
5254         super(UTCTime, self).__setstate__(state)
5255         self.ber_raw = state.ber_raw
5256
5257     def __bytes__(self):
5258         self._assert_ready()
5259         return self._encode_time()
5260
5261     def __eq__(self, their):
5262         if their.__class__ == binary_type:
5263             return self._encode_time() == their
5264         if their.__class__ == datetime:
5265             return self.todatetime() == their
5266         if not isinstance(their, self.__class__):
5267             return False
5268         return (
5269             self._value == their._value and
5270             self.tag == their.tag and
5271             self._expl == their._expl
5272         )
5273
5274     def _encode_time(self):
5275         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5276
5277     def _encode(self):
5278         self._assert_ready()
5279         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5280
5281     def _encode1st(self, state):
5282         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5283
5284     def _encode2nd(self, writer, state_iter):
5285         self._assert_ready()
5286         write_full(writer, self._encode())
5287
5288     def _encode_cer(self, writer):
5289         write_full(writer, self._encode())
5290
5291     def todatetime(self):
5292         return self._value
5293
5294     def __repr__(self):
5295         return pp_console_row(next(self.pps()))
5296
5297     def pps(self, decode_path=()):
5298         yield _pp(
5299             obj=self,
5300             asn1_type_name=self.asn1_type_name,
5301             obj_name=self.__class__.__name__,
5302             decode_path=decode_path,
5303             value=self._pp_value(),
5304             optional=self.optional,
5305             default=self == self.default,
5306             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5307             expl=None if self._expl is None else tag_decode(self._expl),
5308             offset=self.offset,
5309             tlen=self.tlen,
5310             llen=self.llen,
5311             vlen=self.vlen,
5312             expl_offset=self.expl_offset if self.expled else None,
5313             expl_tlen=self.expl_tlen if self.expled else None,
5314             expl_llen=self.expl_llen if self.expled else None,
5315             expl_vlen=self.expl_vlen if self.expled else None,
5316             expl_lenindef=self.expl_lenindef,
5317             ber_encoded=self.ber_encoded,
5318             bered=self.bered,
5319         )
5320         for pp in self.pps_lenindef(decode_path):
5321             yield pp
5322
5323
5324 class GeneralizedTime(UTCTime):
5325     """``GeneralizedTime`` datetime type
5326
5327     This type is similar to :py:class:`pyderasn.UTCTime`.
5328
5329     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5330     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5331     >>> str(t)
5332     '20170930220750.000123Z'
5333     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5334     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5335
5336     .. warning::
5337
5338        Only **naive** datetime objects are supported.
5339        Library assumes that all work is done in UTC.
5340
5341     .. warning::
5342
5343        Only **microsecond** fractions are supported in DER encoding.
5344        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5345        higher precision values.
5346
5347     .. warning::
5348
5349        **BER** encoded data can loss information (accuracy) during
5350        decoding because of float transformations.
5351
5352     .. warning::
5353
5354        **Zero** year is unsupported.
5355     """
5356     __slots__ = ()
5357     tag_default = tag_encode(24)
5358     asn1_type_name = "GeneralizedTime"
5359
5360     def _dt_sanitize(self, value):
5361         return value
5362
5363     def _strptime_bered(self, value):
5364         if len(value) < 4 + 3 * 2:
5365             raise ValueError("invalid GeneralizedTime")
5366         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5367         decoded = datetime(year, month, day, hour)
5368         offset, value = 0, value[10:]
5369         if len(value) == 0:
5370             return offset, decoded
5371         if value[-1] == "Z":
5372             value = value[:-1]
5373         else:
5374             for char, sign in (("-", -1), ("+", 1)):
5375                 idx = value.rfind(char)
5376                 if idx == -1:
5377                     continue
5378                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5379                 v = pureint(offset_raw)
5380                 if len(offset_raw) == 4:
5381                     offset, v = (60 * (v % 100)), v // 100
5382                     if offset >= 3600:
5383                         raise ValueError("invalid UTC offset minutes")
5384                 elif len(offset_raw) == 2:
5385                     pass
5386                 else:
5387                     raise ValueError("invalid UTC offset")
5388                 offset += 3600 * v
5389                 if offset > 14 * 3600:
5390                     raise ValueError("too big UTC offset")
5391                 offset *= sign
5392                 break
5393         if len(value) == 0:
5394             return offset, decoded
5395         if value[0] in DECIMAL_SIGNS:
5396             return offset, (
5397                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5398             )
5399         if len(value) < 2:
5400             raise ValueError("stripped minutes")
5401         decoded += timedelta(seconds=60 * pureint(value[:2]))
5402         value = value[2:]
5403         if len(value) == 0:
5404             return offset, decoded
5405         if value[0] in DECIMAL_SIGNS:
5406             return offset, (
5407                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5408             )
5409         if len(value) < 2:
5410             raise ValueError("stripped seconds")
5411         decoded += timedelta(seconds=pureint(value[:2]))
5412         value = value[2:]
5413         if len(value) == 0:
5414             return offset, decoded
5415         if value[0] not in DECIMAL_SIGNS:
5416             raise ValueError("invalid format after seconds")
5417         return offset, (
5418             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5419         )
5420
5421     def _strptime(self, value):
5422         l = len(value)
5423         if l == LEN_YYYYMMDDHHMMSSZ:
5424             # datetime.strptime's format: %Y%m%d%H%M%SZ
5425             if value[-1] != "Z":
5426                 raise ValueError("non UTC timezone")
5427             return datetime(*str_to_time_fractions(value[:-1]))
5428         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5429             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5430             if value[-1] != "Z":
5431                 raise ValueError("non UTC timezone")
5432             if value[14] != ".":
5433                 raise ValueError("no fractions separator")
5434             us = value[15:-1]
5435             if us[-1] == "0":
5436                 raise ValueError("trailing zero")
5437             us_len = len(us)
5438             if us_len > 6:
5439                 raise ValueError("only microsecond fractions are supported")
5440             us = pureint(us + ("0" * (6 - us_len)))
5441             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5442             return datetime(year, month, day, hour, minute, second, us)
5443         raise ValueError("invalid GeneralizedTime length")
5444
5445     def _encode_time(self):
5446         value = self._value
5447         encoded = value.strftime("%Y%m%d%H%M%S")
5448         if value.microsecond > 0:
5449             encoded += (".%06d" % value.microsecond).rstrip("0")
5450         return (encoded + "Z").encode("ascii")
5451
5452     def _encode(self):
5453         self._assert_ready()
5454         value = self._value
5455         if value.microsecond > 0:
5456             encoded = self._encode_time()
5457             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5458         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5459
5460     def _encode1st(self, state):
5461         self._assert_ready()
5462         vlen = len(self._encode_time())
5463         return len(self.tag) + len_size(vlen) + vlen, state
5464
5465     def _encode2nd(self, writer, state_iter):
5466         write_full(writer, self._encode())
5467
5468
5469 class GraphicString(CommonString):
5470     __slots__ = ()
5471     tag_default = tag_encode(25)
5472     encoding = "iso-8859-1"
5473     asn1_type_name = "GraphicString"
5474
5475
5476 class GeneralString(CommonString):
5477     __slots__ = ()
5478     tag_default = tag_encode(27)
5479     encoding = "iso-8859-1"
5480     asn1_type_name = "GeneralString"
5481
5482
5483 class UniversalString(CommonString):
5484     __slots__ = ()
5485     tag_default = tag_encode(28)
5486     encoding = "utf-32-be"
5487     asn1_type_name = "UniversalString"
5488
5489
5490 class BMPString(CommonString):
5491     __slots__ = ()
5492     tag_default = tag_encode(30)
5493     encoding = "utf-16-be"
5494     asn1_type_name = "BMPString"
5495
5496
5497 ChoiceState = namedtuple(
5498     "ChoiceState",
5499     BasicState._fields + ("specs", "value",),
5500     **NAMEDTUPLE_KWARGS
5501 )
5502
5503
5504 class Choice(Obj):
5505     """``CHOICE`` special type
5506
5507     ::
5508
5509         class GeneralName(Choice):
5510             schema = (
5511                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5512                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5513             )
5514
5515     >>> gn = GeneralName()
5516     GeneralName CHOICE
5517     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5518     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5519     >>> gn["dNSName"] = IA5String("bar.baz")
5520     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5521     >>> gn["rfc822Name"]
5522     None
5523     >>> gn["dNSName"]
5524     [2] IA5String IA5 bar.baz
5525     >>> gn.choice
5526     'dNSName'
5527     >>> gn.value == gn["dNSName"]
5528     True
5529     >>> gn.specs
5530     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5531
5532     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5533     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5534     """
5535     __slots__ = ("specs",)
5536     tag_default = None
5537     asn1_type_name = "CHOICE"
5538
5539     def __init__(
5540             self,
5541             value=None,
5542             schema=None,
5543             impl=None,
5544             expl=None,
5545             default=None,
5546             optional=False,
5547             _decoded=(0, 0, 0),
5548     ):
5549         """
5550         :param value: set the value. Either ``(choice, value)`` tuple, or
5551                       :py:class:`pyderasn.Choice` object
5552         :param bytes impl: can not be set, do **not** use it
5553         :param bytes expl: override default tag with ``EXPLICIT`` one
5554         :param default: set default value. Type same as in ``value``
5555         :param bool optional: is object ``OPTIONAL`` in sequence
5556         """
5557         if impl is not None:
5558             raise ValueError("no implicit tag allowed for CHOICE")
5559         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5560         if schema is None:
5561             schema = getattr(self, "schema", ())
5562         if len(schema) == 0:
5563             raise ValueError("schema must be specified")
5564         self.specs = (
5565             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5566         )
5567         self._value = None
5568         if value is not None:
5569             self._value = self._value_sanitize(value)
5570         if default is not None:
5571             default_value = self._value_sanitize(default)
5572             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5573             default_obj.specs = self.specs
5574             default_obj._value = default_value
5575             self.default = default_obj
5576             if value is None:
5577                 self._value = copy(default_obj._value)
5578         if self._expl is not None:
5579             tag_class, _, tag_num = tag_decode(self._expl)
5580             self._tag_order = (tag_class, tag_num)
5581
5582     def _value_sanitize(self, value):
5583         if (value.__class__ == tuple) and len(value) == 2:
5584             choice, obj = value
5585             spec = self.specs.get(choice)
5586             if spec is None:
5587                 raise ObjUnknown(choice)
5588             if not isinstance(obj, spec.__class__):
5589                 raise InvalidValueType((spec,))
5590             return (choice, spec(obj))
5591         if isinstance(value, self.__class__):
5592             return value._value
5593         raise InvalidValueType((self.__class__, tuple))
5594
5595     @property
5596     def ready(self):
5597         return self._value is not None and self._value[1].ready
5598
5599     @property
5600     def bered(self):
5601         return self.expl_lenindef or (
5602             (self._value is not None) and
5603             self._value[1].bered
5604         )
5605
5606     def __getstate__(self):
5607         return ChoiceState(
5608             __version__,
5609             self.tag,
5610             self._tag_order,
5611             self._expl,
5612             self.default,
5613             self.optional,
5614             self.offset,
5615             self.llen,
5616             self.vlen,
5617             self.expl_lenindef,
5618             self.lenindef,
5619             self.ber_encoded,
5620             self.specs,
5621             copy(self._value),
5622         )
5623
5624     def __setstate__(self, state):
5625         super(Choice, self).__setstate__(state)
5626         self.specs = state.specs
5627         self._value = state.value
5628
5629     def __eq__(self, their):
5630         if (their.__class__ == tuple) and len(their) == 2:
5631             return self._value == their
5632         if not isinstance(their, self.__class__):
5633             return False
5634         return (
5635             self.specs == their.specs and
5636             self._value == their._value
5637         )
5638
5639     def __call__(
5640             self,
5641             value=None,
5642             expl=None,
5643             default=None,
5644             optional=None,
5645     ):
5646         return self.__class__(
5647             value=value,
5648             schema=self.specs,
5649             expl=self._expl if expl is None else expl,
5650             default=self.default if default is None else default,
5651             optional=self.optional if optional is None else optional,
5652         )
5653
5654     @property
5655     def choice(self):
5656         """Name of the choice
5657         """
5658         self._assert_ready()
5659         return self._value[0]
5660
5661     @property
5662     def value(self):
5663         """Value of underlying choice
5664         """
5665         self._assert_ready()
5666         return self._value[1]
5667
5668     @property
5669     def tag_order(self):
5670         self._assert_ready()
5671         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5672
5673     @property
5674     def tag_order_cer(self):
5675         return min(v.tag_order_cer for v in itervalues(self.specs))
5676
5677     def __getitem__(self, key):
5678         if key not in self.specs:
5679             raise ObjUnknown(key)
5680         if self._value is None:
5681             return None
5682         choice, value = self._value
5683         if choice != key:
5684             return None
5685         return value
5686
5687     def __setitem__(self, key, value):
5688         spec = self.specs.get(key)
5689         if spec is None:
5690             raise ObjUnknown(key)
5691         if not isinstance(value, spec.__class__):
5692             raise InvalidValueType((spec.__class__,))
5693         self._value = (key, spec(value))
5694
5695     @property
5696     def tlen(self):
5697         return 0
5698
5699     @property
5700     def decoded(self):
5701         return self._value[1].decoded if self.ready else False
5702
5703     def _encode(self):
5704         self._assert_ready()
5705         return self._value[1].encode()
5706
5707     def _encode1st(self, state):
5708         self._assert_ready()
5709         return self._value[1].encode1st(state)
5710
5711     def _encode2nd(self, writer, state_iter):
5712         self._value[1].encode2nd(writer, state_iter)
5713
5714     def _encode_cer(self, writer):
5715         self._assert_ready()
5716         self._value[1].encode_cer(writer)
5717
5718     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5719         for choice, spec in iteritems(self.specs):
5720             sub_decode_path = decode_path + (choice,)
5721             try:
5722                 spec.decode(
5723                     tlv,
5724                     offset=offset,
5725                     leavemm=True,
5726                     decode_path=sub_decode_path,
5727                     ctx=ctx,
5728                     tag_only=True,
5729                     _ctx_immutable=False,
5730                 )
5731             except TagMismatch:
5732                 continue
5733             break
5734         else:
5735             raise TagMismatch(
5736                 klass=self.__class__,
5737                 decode_path=decode_path,
5738                 offset=offset,
5739             )
5740         if tag_only:  # pragma: no cover
5741             yield None
5742             return
5743         if evgen_mode:
5744             for _decode_path, value, tail in spec.decode_evgen(
5745                     tlv,
5746                     offset=offset,
5747                     leavemm=True,
5748                     decode_path=sub_decode_path,
5749                     ctx=ctx,
5750                     _ctx_immutable=False,
5751             ):
5752                 yield _decode_path, value, tail
5753         else:
5754             _, value, tail = next(spec.decode_evgen(
5755                 tlv,
5756                 offset=offset,
5757                 leavemm=True,
5758                 decode_path=sub_decode_path,
5759                 ctx=ctx,
5760                 _ctx_immutable=False,
5761                 _evgen_mode=False,
5762             ))
5763         obj = self.__class__(
5764             schema=self.specs,
5765             expl=self._expl,
5766             default=self.default,
5767             optional=self.optional,
5768             _decoded=(offset, 0, value.fulllen),
5769         )
5770         obj._value = (choice, value)
5771         yield decode_path, obj, tail
5772
5773     def __repr__(self):
5774         value = pp_console_row(next(self.pps()))
5775         if self.ready:
5776             value = "%s[%r]" % (value, self.value)
5777         return value
5778
5779     def pps(self, decode_path=()):
5780         yield _pp(
5781             obj=self,
5782             asn1_type_name=self.asn1_type_name,
5783             obj_name=self.__class__.__name__,
5784             decode_path=decode_path,
5785             value=self.choice if self.ready else None,
5786             optional=self.optional,
5787             default=self == self.default,
5788             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5789             expl=None if self._expl is None else tag_decode(self._expl),
5790             offset=self.offset,
5791             tlen=self.tlen,
5792             llen=self.llen,
5793             vlen=self.vlen,
5794             expl_lenindef=self.expl_lenindef,
5795             bered=self.bered,
5796         )
5797         if self.ready:
5798             yield self.value.pps(decode_path=decode_path + (self.choice,))
5799         for pp in self.pps_lenindef(decode_path):
5800             yield pp
5801
5802
5803 class PrimitiveTypes(Choice):
5804     """Predefined ``CHOICE`` for all generic primitive types
5805
5806     It could be useful for general decoding of some unspecified values:
5807
5808     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5809     OCTET STRING 3 bytes 666f6f
5810     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5811     INTEGER 1193046
5812     """
5813     __slots__ = ()
5814     schema = tuple((klass.__name__, klass()) for klass in (
5815         Boolean,
5816         Integer,
5817         BitString,
5818         OctetString,
5819         Null,
5820         ObjectIdentifier,
5821         UTF8String,
5822         NumericString,
5823         PrintableString,
5824         TeletexString,
5825         VideotexString,
5826         IA5String,
5827         UTCTime,
5828         GeneralizedTime,
5829         GraphicString,
5830         VisibleString,
5831         ISO646String,
5832         GeneralString,
5833         UniversalString,
5834         BMPString,
5835     ))
5836
5837
5838 AnyState = namedtuple(
5839     "AnyState",
5840     BasicState._fields + ("value", "defined"),
5841     **NAMEDTUPLE_KWARGS
5842 )
5843
5844
5845 class Any(Obj):
5846     """``ANY`` special type
5847
5848     >>> Any(Integer(-123))
5849     ANY INTEGER -123 (0X:7B)
5850     >>> a = Any(OctetString(b"hello world").encode())
5851     ANY 040b68656c6c6f20776f726c64
5852     >>> hexenc(bytes(a))
5853     b'0x040x0bhello world'
5854     """
5855     __slots__ = ("defined",)
5856     tag_default = tag_encode(0)
5857     asn1_type_name = "ANY"
5858
5859     def __init__(
5860             self,
5861             value=None,
5862             expl=None,
5863             optional=False,
5864             _decoded=(0, 0, 0),
5865     ):
5866         """
5867         :param value: set the value. Either any kind of pyderasn's
5868                       **ready** object, or bytes. Pay attention that
5869                       **no** validation is performed if raw binary value
5870                       is valid TLV, except just tag decoding
5871         :param bytes expl: override default tag with ``EXPLICIT`` one
5872         :param bool optional: is object ``OPTIONAL`` in sequence
5873         """
5874         super(Any, self).__init__(None, expl, None, optional, _decoded)
5875         if value is None:
5876             self._value = None
5877         else:
5878             value = self._value_sanitize(value)
5879             self._value = value
5880             if self._expl is None:
5881                 if value.__class__ == binary_type:
5882                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5883                 else:
5884                     tag_class, tag_num = value.tag_order
5885             else:
5886                 tag_class, _, tag_num = tag_decode(self._expl)
5887             self._tag_order = (tag_class, tag_num)
5888         self.defined = None
5889
5890     def _value_sanitize(self, value):
5891         if value.__class__ == binary_type:
5892             if len(value) == 0:
5893                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5894             return value
5895         if isinstance(value, self.__class__):
5896             return value._value
5897         if not isinstance(value, Obj):
5898             raise InvalidValueType((self.__class__, Obj, binary_type))
5899         return value
5900
5901     @property
5902     def ready(self):
5903         return self._value is not None
5904
5905     @property
5906     def tag_order(self):
5907         self._assert_ready()
5908         return self._tag_order
5909
5910     @property
5911     def bered(self):
5912         if self.expl_lenindef or self.lenindef:
5913             return True
5914         if self.defined is None:
5915             return False
5916         return self.defined[1].bered
5917
5918     def __getstate__(self):
5919         return AnyState(
5920             __version__,
5921             self.tag,
5922             self._tag_order,
5923             self._expl,
5924             None,
5925             self.optional,
5926             self.offset,
5927             self.llen,
5928             self.vlen,
5929             self.expl_lenindef,
5930             self.lenindef,
5931             self.ber_encoded,
5932             self._value,
5933             self.defined,
5934         )
5935
5936     def __setstate__(self, state):
5937         super(Any, self).__setstate__(state)
5938         self._value = state.value
5939         self.defined = state.defined
5940
5941     def __eq__(self, their):
5942         if their.__class__ == binary_type:
5943             if self._value.__class__ == binary_type:
5944                 return self._value == their
5945             return self._value.encode() == their
5946         if issubclass(their.__class__, Any):
5947             if self.ready and their.ready:
5948                 return bytes(self) == bytes(their)
5949             return self.ready == their.ready
5950         return False
5951
5952     def __call__(
5953             self,
5954             value=None,
5955             expl=None,
5956             optional=None,
5957     ):
5958         return self.__class__(
5959             value=value,
5960             expl=self._expl if expl is None else expl,
5961             optional=self.optional if optional is None else optional,
5962         )
5963
5964     def __bytes__(self):
5965         self._assert_ready()
5966         value = self._value
5967         if value.__class__ == binary_type:
5968             return value
5969         return self._value.encode()
5970
5971     @property
5972     def tlen(self):
5973         return 0
5974
5975     def _encode(self):
5976         self._assert_ready()
5977         value = self._value
5978         if value.__class__ == binary_type:
5979             return value
5980         return value.encode()
5981
5982     def _encode1st(self, state):
5983         self._assert_ready()
5984         value = self._value
5985         if value.__class__ == binary_type:
5986             return len(value), state
5987         return value.encode1st(state)
5988
5989     def _encode2nd(self, writer, state_iter):
5990         value = self._value
5991         if value.__class__ == binary_type:
5992             write_full(writer, value)
5993         else:
5994             value.encode2nd(writer, state_iter)
5995
5996     def _encode_cer(self, writer):
5997         self._assert_ready()
5998         value = self._value
5999         if value.__class__ == binary_type:
6000             write_full(writer, value)
6001         else:
6002             value.encode_cer(writer)
6003
6004     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6005         try:
6006             t, tlen, lv = tag_strip(tlv)
6007         except DecodeError as err:
6008             raise err.__class__(
6009                 msg=err.msg,
6010                 klass=self.__class__,
6011                 decode_path=decode_path,
6012                 offset=offset,
6013             )
6014         try:
6015             l, llen, v = len_decode(lv)
6016         except LenIndefForm as err:
6017             if not ctx.get("bered", False):
6018                 raise err.__class__(
6019                     msg=err.msg,
6020                     klass=self.__class__,
6021                     decode_path=decode_path,
6022                     offset=offset,
6023                 )
6024             llen, vlen, v = 1, 0, lv[1:]
6025             sub_offset = offset + tlen + llen
6026             chunk_i = 0
6027             while v[:EOC_LEN].tobytes() != EOC:
6028                 chunk, v = Any().decode(
6029                     v,
6030                     offset=sub_offset,
6031                     decode_path=decode_path + (str(chunk_i),),
6032                     leavemm=True,
6033                     ctx=ctx,
6034                     _ctx_immutable=False,
6035                 )
6036                 vlen += chunk.tlvlen
6037                 sub_offset += chunk.tlvlen
6038                 chunk_i += 1
6039             tlvlen = tlen + llen + vlen + EOC_LEN
6040             obj = self.__class__(
6041                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6042                 expl=self._expl,
6043                 optional=self.optional,
6044                 _decoded=(offset, 0, tlvlen),
6045             )
6046             obj.lenindef = True
6047             obj.tag = t.tobytes()
6048             yield decode_path, obj, v[EOC_LEN:]
6049             return
6050         except DecodeError as err:
6051             raise err.__class__(
6052                 msg=err.msg,
6053                 klass=self.__class__,
6054                 decode_path=decode_path,
6055                 offset=offset,
6056             )
6057         if l > len(v):
6058             raise NotEnoughData(
6059                 "encoded length is longer than data",
6060                 klass=self.__class__,
6061                 decode_path=decode_path,
6062                 offset=offset,
6063             )
6064         tlvlen = tlen + llen + l
6065         v, tail = tlv[:tlvlen], v[l:]
6066         obj = self.__class__(
6067             value=None if evgen_mode else v.tobytes(),
6068             expl=self._expl,
6069             optional=self.optional,
6070             _decoded=(offset, 0, tlvlen),
6071         )
6072         obj.tag = t.tobytes()
6073         yield decode_path, obj, tail
6074
6075     def __repr__(self):
6076         return pp_console_row(next(self.pps()))
6077
6078     def pps(self, decode_path=()):
6079         value = self._value
6080         if value is None:
6081             pass
6082         elif value.__class__ == binary_type:
6083             value = None
6084         else:
6085             value = repr(value)
6086         yield _pp(
6087             obj=self,
6088             asn1_type_name=self.asn1_type_name,
6089             obj_name=self.__class__.__name__,
6090             decode_path=decode_path,
6091             value=value,
6092             blob=self._value if self._value.__class__ == binary_type else None,
6093             optional=self.optional,
6094             default=self == self.default,
6095             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6096             expl=None if self._expl is None else tag_decode(self._expl),
6097             offset=self.offset,
6098             tlen=self.tlen,
6099             llen=self.llen,
6100             vlen=self.vlen,
6101             expl_offset=self.expl_offset if self.expled else None,
6102             expl_tlen=self.expl_tlen if self.expled else None,
6103             expl_llen=self.expl_llen if self.expled else None,
6104             expl_vlen=self.expl_vlen if self.expled else None,
6105             expl_lenindef=self.expl_lenindef,
6106             lenindef=self.lenindef,
6107             bered=self.bered,
6108         )
6109         defined_by, defined = self.defined or (None, None)
6110         if defined_by is not None:
6111             yield defined.pps(
6112                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6113             )
6114         for pp in self.pps_lenindef(decode_path):
6115             yield pp
6116
6117
6118 ########################################################################
6119 # ASN.1 constructed types
6120 ########################################################################
6121
6122 def abs_decode_path(decode_path, rel_path):
6123     """Create an absolute decode path from current and relative ones
6124
6125     :param decode_path: current decode path, starting point. Tuple of strings
6126     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6127                      If first tuple's element is "/", then treat it as
6128                      an absolute path, ignoring ``decode_path`` as
6129                      starting point. Also this tuple can contain ".."
6130                      elements, stripping the leading element from
6131                      ``decode_path``
6132
6133     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6134     ("foo", "bar", "baz", "whatever")
6135     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6136     ("foo", "whatever")
6137     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6138     ("baz", "whatever")
6139     """
6140     if rel_path[0] == "/":
6141         return rel_path[1:]
6142     if rel_path[0] == "..":
6143         return abs_decode_path(decode_path[:-1], rel_path[1:])
6144     return decode_path + rel_path
6145
6146
6147 SequenceState = namedtuple(
6148     "SequenceState",
6149     BasicState._fields + ("specs", "value",),
6150     **NAMEDTUPLE_KWARGS
6151 )
6152
6153
6154 class SequenceEncode1stMixing(object):
6155     def _encode1st(self, state):
6156         state.append(0)
6157         idx = len(state) - 1
6158         vlen = 0
6159         for v in self._values_for_encoding():
6160             l, _ = v.encode1st(state)
6161             vlen += l
6162         state[idx] = vlen
6163         return len(self.tag) + len_size(vlen) + vlen, state
6164
6165
6166 class Sequence(SequenceEncode1stMixing, Obj):
6167     """``SEQUENCE`` structure type
6168
6169     You have to make specification of sequence::
6170
6171         class Extension(Sequence):
6172             schema = (
6173                 ("extnID", ObjectIdentifier()),
6174                 ("critical", Boolean(default=False)),
6175                 ("extnValue", OctetString()),
6176             )
6177
6178     Then, you can work with it as with dictionary.
6179
6180     >>> ext = Extension()
6181     >>> Extension().specs
6182     OrderedDict([
6183         ('extnID', OBJECT IDENTIFIER),
6184         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6185         ('extnValue', OCTET STRING),
6186     ])
6187     >>> ext["extnID"] = "1.2.3"
6188     Traceback (most recent call last):
6189     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6190     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6191
6192     You can determine if sequence is ready to be encoded:
6193
6194     >>> ext.ready
6195     False
6196     >>> ext.encode()
6197     Traceback (most recent call last):
6198     pyderasn.ObjNotReady: object is not ready: extnValue
6199     >>> ext["extnValue"] = OctetString(b"foobar")
6200     >>> ext.ready
6201     True
6202
6203     Value you want to assign, must have the same **type** as in
6204     corresponding specification, but it can have different tags,
6205     optional/default attributes -- they will be taken from specification
6206     automatically::
6207
6208         class TBSCertificate(Sequence):
6209             schema = (
6210                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6211             [...]
6212
6213     >>> tbs = TBSCertificate()
6214     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6215
6216     Assign ``None`` to remove value from sequence.
6217
6218     You can set values in Sequence during its initialization:
6219
6220     >>> AlgorithmIdentifier((
6221         ("algorithm", ObjectIdentifier("1.2.3")),
6222         ("parameters", Any(Null()))
6223     ))
6224     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6225
6226     You can determine if value exists/set in the sequence and take its value:
6227
6228     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6229     (True, True, False)
6230     >>> ext["extnID"]
6231     OBJECT IDENTIFIER 1.2.3
6232
6233     But pay attention that if value has default, then it won't be (not
6234     in) in the sequence (because ``DEFAULT`` must not be encoded in
6235     DER), but you can read its value:
6236
6237     >>> "critical" in ext, ext["critical"]
6238     (False, BOOLEAN False)
6239     >>> ext["critical"] = Boolean(True)
6240     >>> "critical" in ext, ext["critical"]
6241     (True, BOOLEAN True)
6242
6243     All defaulted values are always optional.
6244
6245     .. _allow_default_values_ctx:
6246
6247     DER prohibits default value encoding and will raise an error if
6248     default value is unexpectedly met during decode.
6249     If :ref:`bered <bered_ctx>` context option is set, then no error
6250     will be raised, but ``bered`` attribute set. You can disable strict
6251     defaulted values existence validation by setting
6252     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6253
6254     All values with DEFAULT specified are decoded atomically in
6255     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6256     SEQUENCE, then it will be yielded as a single element, not
6257     disassembled. That is required for DEFAULT existence check.
6258
6259     Two sequences are equal if they have equal specification (schema),
6260     implicit/explicit tagging and the same values.
6261     """
6262     __slots__ = ("specs",)
6263     tag_default = tag_encode(form=TagFormConstructed, num=16)
6264     asn1_type_name = "SEQUENCE"
6265
6266     def __init__(
6267             self,
6268             value=None,
6269             schema=None,
6270             impl=None,
6271             expl=None,
6272             default=None,
6273             optional=False,
6274             _decoded=(0, 0, 0),
6275     ):
6276         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6277         if schema is None:
6278             schema = getattr(self, "schema", ())
6279         self.specs = (
6280             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6281         )
6282         self._value = {}
6283         if value is not None:
6284             if issubclass(value.__class__, Sequence):
6285                 self._value = value._value
6286             elif hasattr(value, "__iter__"):
6287                 for seq_key, seq_value in value:
6288                     self[seq_key] = seq_value
6289             else:
6290                 raise InvalidValueType((Sequence,))
6291         if default is not None:
6292             if not issubclass(default.__class__, Sequence):
6293                 raise InvalidValueType((Sequence,))
6294             default_value = default._value
6295             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6296             default_obj.specs = self.specs
6297             default_obj._value = default_value
6298             self.default = default_obj
6299             if value is None:
6300                 self._value = copy(default_obj._value)
6301
6302     @property
6303     def ready(self):
6304         for name, spec in iteritems(self.specs):
6305             value = self._value.get(name)
6306             if value is None:
6307                 if spec.optional:
6308                     continue
6309                 return False
6310             if not value.ready:
6311                 return False
6312         return True
6313
6314     @property
6315     def bered(self):
6316         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6317             return True
6318         return any(value.bered for value in itervalues(self._value))
6319
6320     def __getstate__(self):
6321         return SequenceState(
6322             __version__,
6323             self.tag,
6324             self._tag_order,
6325             self._expl,
6326             self.default,
6327             self.optional,
6328             self.offset,
6329             self.llen,
6330             self.vlen,
6331             self.expl_lenindef,
6332             self.lenindef,
6333             self.ber_encoded,
6334             self.specs,
6335             {k: copy(v) for k, v in iteritems(self._value)},
6336         )
6337
6338     def __setstate__(self, state):
6339         super(Sequence, self).__setstate__(state)
6340         self.specs = state.specs
6341         self._value = state.value
6342
6343     def __eq__(self, their):
6344         if not isinstance(their, self.__class__):
6345             return False
6346         return (
6347             self.specs == their.specs and
6348             self.tag == their.tag and
6349             self._expl == their._expl and
6350             self._value == their._value
6351         )
6352
6353     def __call__(
6354             self,
6355             value=None,
6356             impl=None,
6357             expl=None,
6358             default=None,
6359             optional=None,
6360     ):
6361         return self.__class__(
6362             value=value,
6363             schema=self.specs,
6364             impl=self.tag if impl is None else impl,
6365             expl=self._expl if expl is None else expl,
6366             default=self.default if default is None else default,
6367             optional=self.optional if optional is None else optional,
6368         )
6369
6370     def __contains__(self, key):
6371         return key in self._value
6372
6373     def __setitem__(self, key, value):
6374         spec = self.specs.get(key)
6375         if spec is None:
6376             raise ObjUnknown(key)
6377         if value is None:
6378             self._value.pop(key, None)
6379             return
6380         if not isinstance(value, spec.__class__):
6381             raise InvalidValueType((spec.__class__,))
6382         value = spec(value=value)
6383         if spec.default is not None and value == spec.default:
6384             self._value.pop(key, None)
6385             return
6386         self._value[key] = value
6387
6388     def __getitem__(self, key):
6389         value = self._value.get(key)
6390         if value is not None:
6391             return value
6392         spec = self.specs.get(key)
6393         if spec is None:
6394             raise ObjUnknown(key)
6395         if spec.default is not None:
6396             return spec.default
6397         return None
6398
6399     def _values_for_encoding(self):
6400         for name, spec in iteritems(self.specs):
6401             value = self._value.get(name)
6402             if value is None:
6403                 if spec.optional:
6404                     continue
6405                 raise ObjNotReady(name)
6406             yield value
6407
6408     def _encode(self):
6409         v = b"".join(v.encode() for v in self._values_for_encoding())
6410         return b"".join((self.tag, len_encode(len(v)), v))
6411
6412     def _encode2nd(self, writer, state_iter):
6413         write_full(writer, self.tag + len_encode(next(state_iter)))
6414         for v in self._values_for_encoding():
6415             v.encode2nd(writer, state_iter)
6416
6417     def _encode_cer(self, writer):
6418         write_full(writer, self.tag + LENINDEF)
6419         for v in self._values_for_encoding():
6420             v.encode_cer(writer)
6421         write_full(writer, EOC)
6422
6423     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6424         try:
6425             t, tlen, lv = tag_strip(tlv)
6426         except DecodeError as err:
6427             raise err.__class__(
6428                 msg=err.msg,
6429                 klass=self.__class__,
6430                 decode_path=decode_path,
6431                 offset=offset,
6432             )
6433         if t != self.tag:
6434             raise TagMismatch(
6435                 klass=self.__class__,
6436                 decode_path=decode_path,
6437                 offset=offset,
6438             )
6439         if tag_only:  # pragma: no cover
6440             yield None
6441             return
6442         lenindef = False
6443         ctx_bered = ctx.get("bered", False)
6444         try:
6445             l, llen, v = len_decode(lv)
6446         except LenIndefForm as err:
6447             if not ctx_bered:
6448                 raise err.__class__(
6449                     msg=err.msg,
6450                     klass=self.__class__,
6451                     decode_path=decode_path,
6452                     offset=offset,
6453                 )
6454             l, llen, v = 0, 1, lv[1:]
6455             lenindef = True
6456         except DecodeError as err:
6457             raise err.__class__(
6458                 msg=err.msg,
6459                 klass=self.__class__,
6460                 decode_path=decode_path,
6461                 offset=offset,
6462             )
6463         if l > len(v):
6464             raise NotEnoughData(
6465                 "encoded length is longer than data",
6466                 klass=self.__class__,
6467                 decode_path=decode_path,
6468                 offset=offset,
6469             )
6470         if not lenindef:
6471             v, tail = v[:l], v[l:]
6472         vlen = 0
6473         sub_offset = offset + tlen + llen
6474         values = {}
6475         ber_encoded = False
6476         ctx_allow_default_values = ctx.get("allow_default_values", False)
6477         for name, spec in iteritems(self.specs):
6478             if spec.optional and (
6479                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6480                     len(v) == 0
6481             ):
6482                 continue
6483             spec_defaulted = spec.default is not None
6484             sub_decode_path = decode_path + (name,)
6485             try:
6486                 if evgen_mode and not spec_defaulted:
6487                     for _decode_path, value, v_tail in spec.decode_evgen(
6488                             v,
6489                             sub_offset,
6490                             leavemm=True,
6491                             decode_path=sub_decode_path,
6492                             ctx=ctx,
6493                             _ctx_immutable=False,
6494                     ):
6495                         yield _decode_path, value, v_tail
6496                 else:
6497                     _, value, v_tail = next(spec.decode_evgen(
6498                         v,
6499                         sub_offset,
6500                         leavemm=True,
6501                         decode_path=sub_decode_path,
6502                         ctx=ctx,
6503                         _ctx_immutable=False,
6504                         _evgen_mode=False,
6505                     ))
6506             except TagMismatch as err:
6507                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6508                     continue
6509                 raise
6510
6511             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6512             if not evgen_mode and defined is not None:
6513                 defined_by, defined_spec = defined
6514                 if issubclass(value.__class__, SequenceOf):
6515                     for i, _value in enumerate(value):
6516                         sub_sub_decode_path = sub_decode_path + (
6517                             str(i),
6518                             DecodePathDefBy(defined_by),
6519                         )
6520                         defined_value, defined_tail = defined_spec.decode(
6521                             memoryview(bytes(_value)),
6522                             sub_offset + (
6523                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6524                                 if value.expled else (value.tlen + value.llen)
6525                             ),
6526                             leavemm=True,
6527                             decode_path=sub_sub_decode_path,
6528                             ctx=ctx,
6529                             _ctx_immutable=False,
6530                         )
6531                         if len(defined_tail) > 0:
6532                             raise DecodeError(
6533                                 "remaining data",
6534                                 klass=self.__class__,
6535                                 decode_path=sub_sub_decode_path,
6536                                 offset=offset,
6537                             )
6538                         _value.defined = (defined_by, defined_value)
6539                 else:
6540                     defined_value, defined_tail = defined_spec.decode(
6541                         memoryview(bytes(value)),
6542                         sub_offset + (
6543                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6544                             if value.expled else (value.tlen + value.llen)
6545                         ),
6546                         leavemm=True,
6547                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6548                         ctx=ctx,
6549                         _ctx_immutable=False,
6550                     )
6551                     if len(defined_tail) > 0:
6552                         raise DecodeError(
6553                             "remaining data",
6554                             klass=self.__class__,
6555                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6556                             offset=offset,
6557                         )
6558                     value.defined = (defined_by, defined_value)
6559
6560             value_len = value.fulllen
6561             vlen += value_len
6562             sub_offset += value_len
6563             v = v_tail
6564             if spec_defaulted:
6565                 if evgen_mode:
6566                     yield sub_decode_path, value, v_tail
6567                 if value == spec.default:
6568                     if ctx_bered or ctx_allow_default_values:
6569                         ber_encoded = True
6570                     else:
6571                         raise DecodeError(
6572                             "DEFAULT value met",
6573                             klass=self.__class__,
6574                             decode_path=sub_decode_path,
6575                             offset=sub_offset,
6576                         )
6577             if not evgen_mode:
6578                 values[name] = value
6579                 spec_defines = getattr(spec, "defines", ())
6580                 if len(spec_defines) == 0:
6581                     defines_by_path = ctx.get("defines_by_path", ())
6582                     if len(defines_by_path) > 0:
6583                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6584                 if spec_defines is not None and len(spec_defines) > 0:
6585                     for rel_path, schema in spec_defines:
6586                         defined = schema.get(value, None)
6587                         if defined is not None:
6588                             ctx.setdefault("_defines", []).append((
6589                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6590                                 (value, defined),
6591                             ))
6592         if lenindef:
6593             if v[:EOC_LEN].tobytes() != EOC:
6594                 raise DecodeError(
6595                     "no EOC",
6596                     klass=self.__class__,
6597                     decode_path=decode_path,
6598                     offset=offset,
6599                 )
6600             tail = v[EOC_LEN:]
6601             vlen += EOC_LEN
6602         elif len(v) > 0:
6603             raise DecodeError(
6604                 "remaining data",
6605                 klass=self.__class__,
6606                 decode_path=decode_path,
6607                 offset=offset,
6608             )
6609         obj = self.__class__(
6610             schema=self.specs,
6611             impl=self.tag,
6612             expl=self._expl,
6613             default=self.default,
6614             optional=self.optional,
6615             _decoded=(offset, llen, vlen),
6616         )
6617         obj._value = values
6618         obj.lenindef = lenindef
6619         obj.ber_encoded = ber_encoded
6620         yield decode_path, obj, tail
6621
6622     def __repr__(self):
6623         value = pp_console_row(next(self.pps()))
6624         cols = []
6625         for name in self.specs:
6626             _value = self._value.get(name)
6627             if _value is None:
6628                 continue
6629             cols.append("%s: %s" % (name, repr(_value)))
6630         return "%s[%s]" % (value, "; ".join(cols))
6631
6632     def pps(self, decode_path=()):
6633         yield _pp(
6634             obj=self,
6635             asn1_type_name=self.asn1_type_name,
6636             obj_name=self.__class__.__name__,
6637             decode_path=decode_path,
6638             optional=self.optional,
6639             default=self == self.default,
6640             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6641             expl=None if self._expl is None else tag_decode(self._expl),
6642             offset=self.offset,
6643             tlen=self.tlen,
6644             llen=self.llen,
6645             vlen=self.vlen,
6646             expl_offset=self.expl_offset if self.expled else None,
6647             expl_tlen=self.expl_tlen if self.expled else None,
6648             expl_llen=self.expl_llen if self.expled else None,
6649             expl_vlen=self.expl_vlen if self.expled else None,
6650             expl_lenindef=self.expl_lenindef,
6651             lenindef=self.lenindef,
6652             ber_encoded=self.ber_encoded,
6653             bered=self.bered,
6654         )
6655         for name in self.specs:
6656             value = self._value.get(name)
6657             if value is None:
6658                 continue
6659             yield value.pps(decode_path=decode_path + (name,))
6660         for pp in self.pps_lenindef(decode_path):
6661             yield pp
6662
6663
6664 class Set(Sequence, SequenceEncode1stMixing):
6665     """``SET`` structure type
6666
6667     Its usage is identical to :py:class:`pyderasn.Sequence`.
6668
6669     .. _allow_unordered_set_ctx:
6670
6671     DER prohibits unordered values encoding and will raise an error
6672     during decode. If :ref:`bered <bered_ctx>` context option is set,
6673     then no error will occur. Also you can disable strict values
6674     ordering check by setting ``"allow_unordered_set": True``
6675     :ref:`context <ctx>` option.
6676     """
6677     __slots__ = ()
6678     tag_default = tag_encode(form=TagFormConstructed, num=17)
6679     asn1_type_name = "SET"
6680
6681     def _values_for_encoding(self):
6682         return sorted(
6683             super(Set, self)._values_for_encoding(),
6684             key=attrgetter("tag_order"),
6685         )
6686
6687     def _encode_cer(self, writer):
6688         write_full(writer, self.tag + LENINDEF)
6689         for v in sorted(
6690                 super(Set, self)._values_for_encoding(),
6691                 key=attrgetter("tag_order_cer"),
6692         ):
6693             v.encode_cer(writer)
6694         write_full(writer, EOC)
6695
6696     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6697         try:
6698             t, tlen, lv = tag_strip(tlv)
6699         except DecodeError as err:
6700             raise err.__class__(
6701                 msg=err.msg,
6702                 klass=self.__class__,
6703                 decode_path=decode_path,
6704                 offset=offset,
6705             )
6706         if t != self.tag:
6707             raise TagMismatch(
6708                 klass=self.__class__,
6709                 decode_path=decode_path,
6710                 offset=offset,
6711             )
6712         if tag_only:
6713             yield None
6714             return
6715         lenindef = False
6716         ctx_bered = ctx.get("bered", False)
6717         try:
6718             l, llen, v = len_decode(lv)
6719         except LenIndefForm as err:
6720             if not ctx_bered:
6721                 raise err.__class__(
6722                     msg=err.msg,
6723                     klass=self.__class__,
6724                     decode_path=decode_path,
6725                     offset=offset,
6726                 )
6727             l, llen, v = 0, 1, lv[1:]
6728             lenindef = True
6729         except DecodeError as err:
6730             raise err.__class__(
6731                 msg=err.msg,
6732                 klass=self.__class__,
6733                 decode_path=decode_path,
6734                 offset=offset,
6735             )
6736         if l > len(v):
6737             raise NotEnoughData(
6738                 "encoded length is longer than data",
6739                 klass=self.__class__,
6740                 offset=offset,
6741             )
6742         if not lenindef:
6743             v, tail = v[:l], v[l:]
6744         vlen = 0
6745         sub_offset = offset + tlen + llen
6746         values = {}
6747         ber_encoded = False
6748         ctx_allow_default_values = ctx.get("allow_default_values", False)
6749         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6750         tag_order_prev = (0, 0)
6751         _specs_items = copy(self.specs)
6752
6753         while len(v) > 0:
6754             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6755                 break
6756             for name, spec in iteritems(_specs_items):
6757                 sub_decode_path = decode_path + (name,)
6758                 try:
6759                     spec.decode(
6760                         v,
6761                         sub_offset,
6762                         leavemm=True,
6763                         decode_path=sub_decode_path,
6764                         ctx=ctx,
6765                         tag_only=True,
6766                         _ctx_immutable=False,
6767                     )
6768                 except TagMismatch:
6769                     continue
6770                 break
6771             else:
6772                 raise TagMismatch(
6773                     klass=self.__class__,
6774                     decode_path=decode_path,
6775                     offset=offset,
6776                 )
6777             spec_defaulted = spec.default is not None
6778             if evgen_mode and not spec_defaulted:
6779                 for _decode_path, value, v_tail in spec.decode_evgen(
6780                         v,
6781                         sub_offset,
6782                         leavemm=True,
6783                         decode_path=sub_decode_path,
6784                         ctx=ctx,
6785                         _ctx_immutable=False,
6786                 ):
6787                     yield _decode_path, value, v_tail
6788             else:
6789                 _, value, v_tail = next(spec.decode_evgen(
6790                     v,
6791                     sub_offset,
6792                     leavemm=True,
6793                     decode_path=sub_decode_path,
6794                     ctx=ctx,
6795                     _ctx_immutable=False,
6796                     _evgen_mode=False,
6797                 ))
6798             value_tag_order = value.tag_order
6799             value_len = value.fulllen
6800             if tag_order_prev >= value_tag_order:
6801                 if ctx_bered or ctx_allow_unordered_set:
6802                     ber_encoded = True
6803                 else:
6804                     raise DecodeError(
6805                         "unordered " + self.asn1_type_name,
6806                         klass=self.__class__,
6807                         decode_path=sub_decode_path,
6808                         offset=sub_offset,
6809                     )
6810             if spec_defaulted:
6811                 if evgen_mode:
6812                     yield sub_decode_path, value, v_tail
6813                 if value != spec.default:
6814                     pass
6815                 elif ctx_bered or ctx_allow_default_values:
6816                     ber_encoded = True
6817                 else:
6818                     raise DecodeError(
6819                         "DEFAULT value met",
6820                         klass=self.__class__,
6821                         decode_path=sub_decode_path,
6822                         offset=sub_offset,
6823                     )
6824             values[name] = value
6825             del _specs_items[name]
6826             tag_order_prev = value_tag_order
6827             sub_offset += value_len
6828             vlen += value_len
6829             v = v_tail
6830
6831         obj = self.__class__(
6832             schema=self.specs,
6833             impl=self.tag,
6834             expl=self._expl,
6835             default=self.default,
6836             optional=self.optional,
6837             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6838         )
6839         if lenindef:
6840             if v[:EOC_LEN].tobytes() != EOC:
6841                 raise DecodeError(
6842                     "no EOC",
6843                     klass=self.__class__,
6844                     decode_path=decode_path,
6845                     offset=offset,
6846                 )
6847             tail = v[EOC_LEN:]
6848             obj.lenindef = True
6849         for name, spec in iteritems(self.specs):
6850             if name not in values and not spec.optional:
6851                 raise DecodeError(
6852                     "%s value is not ready" % name,
6853                     klass=self.__class__,
6854                     decode_path=decode_path,
6855                     offset=offset,
6856                 )
6857         if not evgen_mode:
6858             obj._value = values
6859         obj.ber_encoded = ber_encoded
6860         yield decode_path, obj, tail
6861
6862
6863 SequenceOfState = namedtuple(
6864     "SequenceOfState",
6865     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6866     **NAMEDTUPLE_KWARGS
6867 )
6868
6869
6870 class SequenceOf(SequenceEncode1stMixing, Obj):
6871     """``SEQUENCE OF`` sequence type
6872
6873     For that kind of type you must specify the object it will carry on
6874     (bounds are for example here, not required)::
6875
6876         class Ints(SequenceOf):
6877             schema = Integer()
6878             bounds = (0, 2)
6879
6880     >>> ints = Ints()
6881     >>> ints.append(Integer(123))
6882     >>> ints.append(Integer(234))
6883     >>> ints
6884     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6885     >>> [int(i) for i in ints]
6886     [123, 234]
6887     >>> ints.append(Integer(345))
6888     Traceback (most recent call last):
6889     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6890     >>> ints[1]
6891     INTEGER 234
6892     >>> ints[1] = Integer(345)
6893     >>> ints
6894     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6895
6896     You can initialize sequence with preinitialized values:
6897
6898     >>> ints = Ints([Integer(123), Integer(234)])
6899
6900     Also you can use iterator as a value:
6901
6902     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6903
6904     And it won't be iterated until encoding process. Pay attention that
6905     bounds and required schema checks are done only during the encoding
6906     process in that case! After encode was called, then value is zeroed
6907     back to empty list and you have to set it again. That mode is useful
6908     mainly with CER encoding mode, where all objects from the iterable
6909     will be streamed to the buffer, without copying all of them to
6910     memory first.
6911     """
6912     __slots__ = ("spec", "_bound_min", "_bound_max")
6913     tag_default = tag_encode(form=TagFormConstructed, num=16)
6914     asn1_type_name = "SEQUENCE OF"
6915
6916     def __init__(
6917             self,
6918             value=None,
6919             schema=None,
6920             bounds=None,
6921             impl=None,
6922             expl=None,
6923             default=None,
6924             optional=False,
6925             _decoded=(0, 0, 0),
6926     ):
6927         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6928         if schema is None:
6929             schema = getattr(self, "schema", None)
6930         if schema is None:
6931             raise ValueError("schema must be specified")
6932         self.spec = schema
6933         self._bound_min, self._bound_max = getattr(
6934             self,
6935             "bounds",
6936             (0, float("+inf")),
6937         ) if bounds is None else bounds
6938         self._value = []
6939         if value is not None:
6940             self._value = self._value_sanitize(value)
6941         if default is not None:
6942             default_value = self._value_sanitize(default)
6943             default_obj = self.__class__(
6944                 schema=schema,
6945                 impl=self.tag,
6946                 expl=self._expl,
6947             )
6948             default_obj._value = default_value
6949             self.default = default_obj
6950             if value is None:
6951                 self._value = copy(default_obj._value)
6952
6953     def _value_sanitize(self, value):
6954         iterator = False
6955         if issubclass(value.__class__, SequenceOf):
6956             value = value._value
6957         elif hasattr(value, NEXT_ATTR_NAME):
6958             iterator = True
6959         elif hasattr(value, "__iter__"):
6960             value = list(value)
6961         else:
6962             raise InvalidValueType((self.__class__, iter, "iterator"))
6963         if not iterator:
6964             if not self._bound_min <= len(value) <= self._bound_max:
6965                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6966             class_expected = self.spec.__class__
6967             for v in value:
6968                 if not isinstance(v, class_expected):
6969                     raise InvalidValueType((class_expected,))
6970         return value
6971
6972     @property
6973     def ready(self):
6974         if hasattr(self._value, NEXT_ATTR_NAME):
6975             return True
6976         if self._bound_min > 0 and len(self._value) == 0:
6977             return False
6978         return all(v.ready for v in self._value)
6979
6980     @property
6981     def bered(self):
6982         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6983             return True
6984         return any(v.bered for v in self._value)
6985
6986     def __getstate__(self):
6987         if hasattr(self._value, NEXT_ATTR_NAME):
6988             raise ValueError("can not pickle SequenceOf with iterator")
6989         return SequenceOfState(
6990             __version__,
6991             self.tag,
6992             self._tag_order,
6993             self._expl,
6994             self.default,
6995             self.optional,
6996             self.offset,
6997             self.llen,
6998             self.vlen,
6999             self.expl_lenindef,
7000             self.lenindef,
7001             self.ber_encoded,
7002             self.spec,
7003             [copy(v) for v in self._value],
7004             self._bound_min,
7005             self._bound_max,
7006         )
7007
7008     def __setstate__(self, state):
7009         super(SequenceOf, self).__setstate__(state)
7010         self.spec = state.spec
7011         self._value = state.value
7012         self._bound_min = state.bound_min
7013         self._bound_max = state.bound_max
7014
7015     def __eq__(self, their):
7016         if isinstance(their, self.__class__):
7017             return (
7018                 self.spec == their.spec and
7019                 self.tag == their.tag and
7020                 self._expl == their._expl and
7021                 self._value == their._value
7022             )
7023         if hasattr(their, "__iter__"):
7024             return self._value == list(their)
7025         return False
7026
7027     def __call__(
7028             self,
7029             value=None,
7030             bounds=None,
7031             impl=None,
7032             expl=None,
7033             default=None,
7034             optional=None,
7035     ):
7036         return self.__class__(
7037             value=value,
7038             schema=self.spec,
7039             bounds=(
7040                 (self._bound_min, self._bound_max)
7041                 if bounds is None else bounds
7042             ),
7043             impl=self.tag if impl is None else impl,
7044             expl=self._expl if expl is None else expl,
7045             default=self.default if default is None else default,
7046             optional=self.optional if optional is None else optional,
7047         )
7048
7049     def __contains__(self, key):
7050         return key in self._value
7051
7052     def append(self, value):
7053         if not isinstance(value, self.spec.__class__):
7054             raise InvalidValueType((self.spec.__class__,))
7055         if len(self._value) + 1 > self._bound_max:
7056             raise BoundsError(
7057                 self._bound_min,
7058                 len(self._value) + 1,
7059                 self._bound_max,
7060             )
7061         self._value.append(value)
7062
7063     def __iter__(self):
7064         return iter(self._value)
7065
7066     def __len__(self):
7067         return len(self._value)
7068
7069     def __setitem__(self, key, value):
7070         if not isinstance(value, self.spec.__class__):
7071             raise InvalidValueType((self.spec.__class__,))
7072         self._value[key] = self.spec(value=value)
7073
7074     def __getitem__(self, key):
7075         return self._value[key]
7076
7077     def _values_for_encoding(self):
7078         return iter(self._value)
7079
7080     def _encode(self):
7081         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7082         if iterator:
7083             values = []
7084             values_append = values.append
7085             class_expected = self.spec.__class__
7086             values_for_encoding = self._values_for_encoding()
7087             self._value = []
7088             for v in values_for_encoding:
7089                 if not isinstance(v, class_expected):
7090                     raise InvalidValueType((class_expected,))
7091                 values_append(v.encode())
7092             if not self._bound_min <= len(values) <= self._bound_max:
7093                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7094             value = b"".join(values)
7095         else:
7096             value = b"".join(v.encode() for v in self._values_for_encoding())
7097         return b"".join((self.tag, len_encode(len(value)), value))
7098
7099     def _encode1st(self, state):
7100         state = super(SequenceOf, self)._encode1st(state)
7101         if hasattr(self._value, NEXT_ATTR_NAME):
7102             self._value = []
7103         return state
7104
7105     def _encode2nd(self, writer, state_iter):
7106         write_full(writer, self.tag + len_encode(next(state_iter)))
7107         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7108         if iterator:
7109             values_count = 0
7110             class_expected = self.spec.__class__
7111             values_for_encoding = self._values_for_encoding()
7112             self._value = []
7113             for v in values_for_encoding:
7114                 if not isinstance(v, class_expected):
7115                     raise InvalidValueType((class_expected,))
7116                 v.encode2nd(writer, state_iter)
7117                 values_count += 1
7118             if not self._bound_min <= values_count <= self._bound_max:
7119                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7120         else:
7121             for v in self._values_for_encoding():
7122                 v.encode2nd(writer, state_iter)
7123
7124     def _encode_cer(self, writer):
7125         write_full(writer, self.tag + LENINDEF)
7126         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7127         if iterator:
7128             class_expected = self.spec.__class__
7129             values_count = 0
7130             values_for_encoding = self._values_for_encoding()
7131             self._value = []
7132             for v in values_for_encoding:
7133                 if not isinstance(v, class_expected):
7134                     raise InvalidValueType((class_expected,))
7135                 v.encode_cer(writer)
7136                 values_count += 1
7137             if not self._bound_min <= values_count <= self._bound_max:
7138                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7139         else:
7140             for v in self._values_for_encoding():
7141                 v.encode_cer(writer)
7142         write_full(writer, EOC)
7143
7144     def _decode(
7145             self,
7146             tlv,
7147             offset,
7148             decode_path,
7149             ctx,
7150             tag_only,
7151             evgen_mode,
7152             ordering_check=False,
7153     ):
7154         try:
7155             t, tlen, lv = tag_strip(tlv)
7156         except DecodeError as err:
7157             raise err.__class__(
7158                 msg=err.msg,
7159                 klass=self.__class__,
7160                 decode_path=decode_path,
7161                 offset=offset,
7162             )
7163         if t != self.tag:
7164             raise TagMismatch(
7165                 klass=self.__class__,
7166                 decode_path=decode_path,
7167                 offset=offset,
7168             )
7169         if tag_only:
7170             yield None
7171             return
7172         lenindef = False
7173         ctx_bered = ctx.get("bered", False)
7174         try:
7175             l, llen, v = len_decode(lv)
7176         except LenIndefForm as err:
7177             if not ctx_bered:
7178                 raise err.__class__(
7179                     msg=err.msg,
7180                     klass=self.__class__,
7181                     decode_path=decode_path,
7182                     offset=offset,
7183                 )
7184             l, llen, v = 0, 1, lv[1:]
7185             lenindef = True
7186         except DecodeError as err:
7187             raise err.__class__(
7188                 msg=err.msg,
7189                 klass=self.__class__,
7190                 decode_path=decode_path,
7191                 offset=offset,
7192             )
7193         if l > len(v):
7194             raise NotEnoughData(
7195                 "encoded length is longer than data",
7196                 klass=self.__class__,
7197                 decode_path=decode_path,
7198                 offset=offset,
7199             )
7200         if not lenindef:
7201             v, tail = v[:l], v[l:]
7202         vlen = 0
7203         sub_offset = offset + tlen + llen
7204         _value = []
7205         _value_count = 0
7206         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7207         value_prev = memoryview(v[:0])
7208         ber_encoded = False
7209         spec = self.spec
7210         while len(v) > 0:
7211             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7212                 break
7213             sub_decode_path = decode_path + (str(_value_count),)
7214             if evgen_mode:
7215                 for _decode_path, value, v_tail in spec.decode_evgen(
7216                         v,
7217                         sub_offset,
7218                         leavemm=True,
7219                         decode_path=sub_decode_path,
7220                         ctx=ctx,
7221                         _ctx_immutable=False,
7222                 ):
7223                     yield _decode_path, value, v_tail
7224             else:
7225                 _, value, v_tail = next(spec.decode_evgen(
7226                     v,
7227                     sub_offset,
7228                     leavemm=True,
7229                     decode_path=sub_decode_path,
7230                     ctx=ctx,
7231                     _ctx_immutable=False,
7232                     _evgen_mode=False,
7233                 ))
7234             value_len = value.fulllen
7235             if ordering_check:
7236                 if value_prev.tobytes() > v[:value_len].tobytes():
7237                     if ctx_bered or ctx_allow_unordered_set:
7238                         ber_encoded = True
7239                     else:
7240                         raise DecodeError(
7241                             "unordered " + self.asn1_type_name,
7242                             klass=self.__class__,
7243                             decode_path=sub_decode_path,
7244                             offset=sub_offset,
7245                         )
7246                 value_prev = v[:value_len]
7247             _value_count += 1
7248             if not evgen_mode:
7249                 _value.append(value)
7250             sub_offset += value_len
7251             vlen += value_len
7252             v = v_tail
7253         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7254             raise DecodeError(
7255                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7256                 klass=self.__class__,
7257                 decode_path=decode_path,
7258                 offset=offset,
7259             )
7260         try:
7261             obj = self.__class__(
7262                 value=None if evgen_mode else _value,
7263                 schema=spec,
7264                 bounds=(self._bound_min, self._bound_max),
7265                 impl=self.tag,
7266                 expl=self._expl,
7267                 default=self.default,
7268                 optional=self.optional,
7269                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7270             )
7271         except BoundsError as err:
7272             raise DecodeError(
7273                 msg=str(err),
7274                 klass=self.__class__,
7275                 decode_path=decode_path,
7276                 offset=offset,
7277             )
7278         if lenindef:
7279             if v[:EOC_LEN].tobytes() != EOC:
7280                 raise DecodeError(
7281                     "no EOC",
7282                     klass=self.__class__,
7283                     decode_path=decode_path,
7284                     offset=offset,
7285                 )
7286             obj.lenindef = True
7287             tail = v[EOC_LEN:]
7288         obj.ber_encoded = ber_encoded
7289         yield decode_path, obj, tail
7290
7291     def __repr__(self):
7292         return "%s[%s]" % (
7293             pp_console_row(next(self.pps())),
7294             ", ".join(repr(v) for v in self._value),
7295         )
7296
7297     def pps(self, decode_path=()):
7298         yield _pp(
7299             obj=self,
7300             asn1_type_name=self.asn1_type_name,
7301             obj_name=self.__class__.__name__,
7302             decode_path=decode_path,
7303             optional=self.optional,
7304             default=self == self.default,
7305             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7306             expl=None if self._expl is None else tag_decode(self._expl),
7307             offset=self.offset,
7308             tlen=self.tlen,
7309             llen=self.llen,
7310             vlen=self.vlen,
7311             expl_offset=self.expl_offset if self.expled else None,
7312             expl_tlen=self.expl_tlen if self.expled else None,
7313             expl_llen=self.expl_llen if self.expled else None,
7314             expl_vlen=self.expl_vlen if self.expled else None,
7315             expl_lenindef=self.expl_lenindef,
7316             lenindef=self.lenindef,
7317             ber_encoded=self.ber_encoded,
7318             bered=self.bered,
7319         )
7320         for i, value in enumerate(self._value):
7321             yield value.pps(decode_path=decode_path + (str(i),))
7322         for pp in self.pps_lenindef(decode_path):
7323             yield pp
7324
7325
7326 class SetOf(SequenceOf):
7327     """``SET OF`` sequence type
7328
7329     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7330     """
7331     __slots__ = ()
7332     tag_default = tag_encode(form=TagFormConstructed, num=17)
7333     asn1_type_name = "SET OF"
7334
7335     def _value_sanitize(self, value):
7336         value = super(SetOf, self)._value_sanitize(value)
7337         if hasattr(value, NEXT_ATTR_NAME):
7338             raise ValueError(
7339                 "SetOf does not support iterator values, as no sense in them"
7340             )
7341         return value
7342
7343     def _encode(self):
7344         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7345         return b"".join((self.tag, len_encode(len(v)), v))
7346
7347     def _encode2nd(self, writer, state_iter):
7348         write_full(writer, self.tag + len_encode(next(state_iter)))
7349         values = []
7350         for v in self._values_for_encoding():
7351             buf = BytesIO()
7352             v.encode2nd(buf.write, state_iter)
7353             values.append(buf.getvalue())
7354         values.sort()
7355         for v in values:
7356             write_full(writer, v)
7357
7358     def _encode_cer(self, writer):
7359         write_full(writer, self.tag + LENINDEF)
7360         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7361             write_full(writer, v)
7362         write_full(writer, EOC)
7363
7364     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7365         return super(SetOf, self)._decode(
7366             tlv,
7367             offset,
7368             decode_path,
7369             ctx,
7370             tag_only,
7371             evgen_mode,
7372             ordering_check=True,
7373         )
7374
7375
7376 def obj_by_path(pypath):  # pragma: no cover
7377     """Import object specified as string Python path
7378
7379     Modules must be separated from classes/functions with ``:``.
7380
7381     >>> obj_by_path("foo.bar:Baz")
7382     <class 'foo.bar.Baz'>
7383     >>> obj_by_path("foo.bar:Baz.boo")
7384     <classmethod 'foo.bar.Baz.boo'>
7385     """
7386     mod, objs = pypath.rsplit(":", 1)
7387     from importlib import import_module
7388     obj = import_module(mod)
7389     for obj_name in objs.split("."):
7390         obj = getattr(obj, obj_name)
7391     return obj
7392
7393
7394 def generic_decoder():  # pragma: no cover
7395     # All of this below is a big hack with self references
7396     choice = PrimitiveTypes()
7397     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7398     choice.specs["SetOf"] = SetOf(schema=choice)
7399     for i in six_xrange(31):
7400         choice.specs["SequenceOf%d" % i] = SequenceOf(
7401             schema=choice,
7402             expl=tag_ctxc(i),
7403         )
7404     choice.specs["Any"] = Any()
7405
7406     # Class name equals to type name, to omit it from output
7407     class SEQUENCEOF(SequenceOf):
7408         __slots__ = ()
7409         schema = choice
7410
7411     def pprint_any(
7412             obj,
7413             oid_maps=(),
7414             with_colours=False,
7415             with_decode_path=False,
7416             decode_path_only=(),
7417             decode_path=(),
7418     ):
7419         def _pprint_pps(pps):
7420             for pp in pps:
7421                 if hasattr(pp, "_fields"):
7422                     if (
7423                             decode_path_only != () and
7424                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7425                     ):
7426                         continue
7427                     if pp.asn1_type_name == Choice.asn1_type_name:
7428                         continue
7429                     pp_kwargs = pp._asdict()
7430                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7431                     pp = _pp(**pp_kwargs)
7432                     yield pp_console_row(
7433                         pp,
7434                         oid_maps=oid_maps,
7435                         with_offsets=True,
7436                         with_blob=False,
7437                         with_colours=with_colours,
7438                         with_decode_path=with_decode_path,
7439                         decode_path_len_decrease=len(decode_path_only),
7440                     )
7441                     for row in pp_console_blob(
7442                             pp,
7443                             decode_path_len_decrease=len(decode_path_only),
7444                     ):
7445                         yield row
7446                 else:
7447                     for row in _pprint_pps(pp):
7448                         yield row
7449         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7450     return SEQUENCEOF(), pprint_any
7451
7452
7453 def ascii_visualize(ba):
7454     """Output only ASCII printable characters, like in hexdump -C
7455
7456     Example output for given binary string (right part)::
7457
7458         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7459                                                            ^^^^^^^^^^^^^^^^
7460     """
7461     return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7462
7463
7464 def hexdump(raw):
7465     """Generate ``hexdump -C`` like output
7466
7467     Rendered example::
7468
7469         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7470         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7471
7472     Result of that function is a generator of lines, where each line is
7473     a list of columns::
7474
7475         [
7476             [...],
7477             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7478                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7479                           " |i..?.....G...k.j|"]
7480             [...],
7481         ]
7482     """
7483     hexed = hexenc(raw).upper()
7484     addr, cols = 0, ["%08x " % 0]
7485     for i in six_xrange(0, len(hexed), 2):
7486         if i != 0 and i // 2 % 8 == 0:
7487             cols[-1] += " "
7488         if i != 0 and i // 2 % 16 == 0:
7489             cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7490             yield cols
7491             addr += 16
7492             cols = ["%08x " % addr]
7493         cols.append(" " + hexed[i:i + 2])
7494     if len(cols) > 0:
7495         cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7496         yield cols
7497
7498
7499 def browse(raw, obj, oid_maps=()):
7500     """Interactive browser
7501
7502     :param bytes raw: binary data you decoded
7503     :param obj: decoded :py:class:`pyderasn.Obj`
7504     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7505                      Its human readable form is printed when OID is met
7506
7507     .. note:: `urwid <http://urwid.org/>`__ dependency required
7508
7509     This browser is an interactive terminal application for browsing
7510     structures of your decoded ASN.1 objects. You can quit it with **q**
7511     key. It consists of three windows:
7512
7513     :tree:
7514      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7515      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7516      **Left** key goes to constructed element above. **Plus**/**Minus**
7517      keys collapse/uncollapse constructed elements. **Space** toggles it
7518     :info:
7519      window with various information about element. You can scroll it
7520      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7521     :hexdump:
7522      window with raw data hexdump and highlighted current element's
7523      contents. It automatically focuses on element's data. You can
7524      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7525      speed) keys. If element has explicit tag, then it also will be
7526      highlighted with different colour
7527
7528     Window's header contains current decode path and progress bars with
7529     position in *info* and *hexdump* windows.
7530
7531     If you press **d**, then current element will be saved in the
7532     current directory under its decode path name (adding ".0", ".1", etc
7533     suffix if such file already exists). **D** will save it with explicit tag.
7534
7535     You can also invoke it with ``--browse`` command line argument.
7536     """
7537     from copy import deepcopy
7538     from os.path import exists as path_exists
7539     import urwid
7540
7541     class TW(urwid.TreeWidget):
7542         def __init__(self, state, *args, **kwargs):
7543             self.state = state
7544             self.scrolled = {"info": False, "hexdump": False}
7545             super(TW, self).__init__(*args, **kwargs)
7546
7547         def _get_pp(self):
7548             pp = self.get_node().get_value()
7549             constructed = len(pp) > 1
7550             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7551
7552         def _state_update(self):
7553             pp, _ = self._get_pp()
7554             self.state["decode_path"].set_text(
7555                 ":".join(str(p) for p in pp.decode_path)
7556             )
7557             lines = deepcopy(self.state["hexed"])
7558
7559             def attr_set(i, attr):
7560                 line = lines[i // 16]
7561                 idx = 1 + (i - 16 * (i // 16))
7562                 line[idx] = (attr, line[idx])
7563
7564             if pp.expl_offset is not None:
7565                 for i in six_xrange(
7566                         pp.expl_offset,
7567                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7568                 ):
7569                     attr_set(i, "select-expl")
7570             for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7571                 attr_set(i, "select-value")
7572             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7573             self.state["hexdump"].set_focus(pp.offset // 16)
7574             self.state["hexdump"].set_focus_valign("middle")
7575             self.state["hexdump_bar"].set_completion(
7576                 (100 * pp.offset // 16) //
7577                 len(self.state["hexdump"]._body.positions())
7578             )
7579
7580             lines = [
7581                 [("header", "Name: "), pp.obj_name],
7582                 [("header", "Type: "), pp.asn1_type_name],
7583                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7584                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7585                     pp.tlen, pp.llen, pp.vlen,
7586                 )],
7587                 [("header", "TLVlen: "), "%d" % sum((
7588                     pp.tlen, pp.llen, pp.vlen,
7589                 ))],
7590                 [("header", "Slice: "), "[%d:%d]" % (
7591                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7592                 )],
7593             ]
7594             if pp.lenindef:
7595                 lines.append([("warning", "LENINDEF")])
7596             if pp.ber_encoded:
7597                 lines.append([("warning", "BER encoded")])
7598             if pp.bered:
7599                 lines.append([("warning", "BERed")])
7600             if pp.expl is not None:
7601                 lines.append([("header", "EXPLICIT")])
7602                 klass, _, num = pp.expl
7603                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7604                 if pp.expl_offset is not None:
7605                     lines.append(["  Offset: %d" % pp.expl_offset])
7606                     lines.append(["  [TLV]len: %d/%d/%d" % (
7607                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7608                     )])
7609                     lines.append(["  TLVlen: %d" % sum((
7610                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7611                     ))])
7612                     lines.append(["  Slice: [%d:%d]" % (
7613                         pp.expl_offset,
7614                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7615                     )])
7616             if pp.impl is not None:
7617                 klass, _, num = pp.impl
7618                 lines.append([
7619                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7620                 ])
7621             if pp.optional:
7622                 lines.append(["OPTIONAL"])
7623             if pp.default:
7624                 lines.append(["DEFAULT"])
7625             if len(pp.decode_path) > 0:
7626                 ent = pp.decode_path[-1]
7627                 if isinstance(ent, DecodePathDefBy):
7628                     lines.append([""])
7629                     value = str(ent.defined_by)
7630                     oid_name = find_oid_name(
7631                         ent.defined_by.asn1_type_name, oid_maps, value,
7632                     )
7633                     lines.append([("header", "DEFINED BY: "), "%s" % (
7634                         value if oid_name is None
7635                         else "%s (%s)" % (oid_name, value)
7636                     )])
7637             lines.append([""])
7638             if pp.value is not None:
7639                 lines.append([("header", "Value: "), pp.value])
7640                 if (
7641                         len(oid_maps) > 0 and
7642                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7643                 ):
7644                     for oid_map in oid_maps:
7645                         oid_name = oid_map.get(pp.value)
7646                         if oid_name is not None:
7647                             lines.append([("header", "Human: "), oid_name])
7648                             break
7649                 if pp.asn1_type_name == Integer.asn1_type_name:
7650                     lines.append([
7651                         ("header", "Decimal: "), "%d" % int(pp.obj),
7652                     ])
7653                     lines.append([
7654                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7655                     ])
7656             if pp.blob.__class__ == binary_type:
7657                 blob = hexenc(pp.blob).upper()
7658                 for i in six_xrange(0, len(blob), 32):
7659                     lines.append([colonize_hex(blob[i:i + 32])])
7660             elif pp.blob.__class__ == tuple:
7661                 lines.append([", ".join(pp.blob)])
7662             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7663             self.state["info_bar"].set_completion(0)
7664
7665         def selectable(self):
7666             if self.state["widget_current"] != self:
7667                 self.state["widget_current"] = self
7668                 self.scrolled["info"] = False
7669                 self.scrolled["hexdump"] = False
7670                 self._state_update()
7671             return super(TW, self).selectable()
7672
7673         def get_display_text(self):
7674             pp, constructed = self._get_pp()
7675             style = "constructed" if constructed else ""
7676             if len(pp.decode_path) == 0:
7677                 return (style, pp.obj_name)
7678             if pp.asn1_type_name == "EOC":
7679                 return ("eoc", "EOC")
7680             ent = pp.decode_path[-1]
7681             if isinstance(ent, DecodePathDefBy):
7682                 value = str(ent.defined_by)
7683                 oid_name = find_oid_name(
7684                     ent.defined_by.asn1_type_name, oid_maps, value,
7685                 )
7686                 return ("defby", "DEFBY:" + (
7687                     value if oid_name is None else oid_name
7688                 ))
7689             return (style, ent)
7690
7691         def _scroll(self, what, step):
7692             self.state[what]._invalidate()
7693             pos = self.state[what].focus_position
7694             if not self.scrolled[what]:
7695                 self.scrolled[what] = True
7696                 pos -= 2
7697             pos = max(0, pos + step)
7698             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7699             self.state[what].set_focus(pos)
7700             self.state[what].set_focus_valign("top")
7701             self.state[what + "_bar"].set_completion(
7702                 (100 * pos) // len(self.state[what]._body.positions())
7703             )
7704
7705         def keypress(self, size, key):
7706             if key == "q":
7707                 raise urwid.ExitMainLoop()
7708
7709             if key == " ":
7710                 self.expanded = not self.expanded
7711                 self.update_expanded_icon()
7712                 return None
7713
7714             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7715             if key in hexdump_steps:
7716                 self._scroll("hexdump", hexdump_steps[key])
7717                 return None
7718
7719             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7720             if key in info_steps:
7721                 self._scroll("info", info_steps[key])
7722                 return None
7723
7724             if key in ("d", "D"):
7725                 pp, _ = self._get_pp()
7726                 dp = ":".join(str(p) for p in pp.decode_path)
7727                 dp = dp.replace(" ", "_")
7728                 if dp == "":
7729                     dp = "root"
7730                 if key == "d" or pp.expl_offset is None:
7731                     data = self.state["raw"][pp.offset:(
7732                         pp.offset + pp.tlen + pp.llen + pp.vlen
7733                     )]
7734                 else:
7735                     data = self.state["raw"][pp.expl_offset:(
7736                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7737                     )]
7738                 ctr = 0
7739
7740                 def duplicate_path(dp, ctr):
7741                     if ctr == 0:
7742                         return dp
7743                     return "%s.%d" % (dp, ctr)
7744
7745                 while True:
7746                     if not path_exists(duplicate_path(dp, ctr)):
7747                         break
7748                     ctr += 1
7749                 dp = duplicate_path(dp, ctr)
7750                 with open(dp, "wb") as fd:
7751                     fd.write(data)
7752                 self.state["decode_path"].set_text(
7753                     ("warning", "Saved to: " + dp)
7754                 )
7755                 return None
7756             return super(TW, self).keypress(size, key)
7757
7758     class PN(urwid.ParentNode):
7759         def __init__(self, state, value, *args, **kwargs):
7760             self.state = state
7761             if not hasattr(value, "_fields"):
7762                 value = list(value)
7763             super(PN, self).__init__(value, *args, **kwargs)
7764
7765         def load_widget(self):
7766             return TW(self.state, self)
7767
7768         def load_child_keys(self):
7769             value = self.get_value()
7770             if hasattr(value, "_fields"):
7771                 return []
7772             return range(len(value[1:]))
7773
7774         def load_child_node(self, key):
7775             return PN(
7776                 self.state,
7777                 self.get_value()[key + 1],
7778                 parent=self,
7779                 key=key,
7780                 depth=self.get_depth() + 1,
7781             )
7782
7783     class LabeledPG(urwid.ProgressBar):
7784         def __init__(self, label, *args, **kwargs):
7785             self.label = label
7786             super(LabeledPG, self).__init__(*args, **kwargs)
7787
7788         def get_text(self):
7789             return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7790
7791     WinHexdump = urwid.ListBox([urwid.Text("")])
7792     WinInfo = urwid.ListBox([urwid.Text("")])
7793     WinDecodePath = urwid.Text("", "center")
7794     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7795     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7796     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7797         {
7798             "raw": raw,
7799             "hexed": list(hexdump(raw)),
7800             "widget_current": None,
7801             "info": WinInfo,
7802             "info_bar": WinInfoBar,
7803             "hexdump": WinHexdump,
7804             "hexdump_bar": WinHexdumpBar,
7805             "decode_path": WinDecodePath,
7806         },
7807         list(obj.pps()),
7808     )))
7809     help_text = " ".join((
7810         "q:quit",
7811         "space:(un)collapse",
7812         "(pg)up/down/home/end:nav",
7813         "jkJK:hexdump hlHL:info",
7814         "dD:dump",
7815     ))
7816     urwid.MainLoop(
7817         urwid.Frame(
7818             urwid.Columns([
7819                 ("weight", 1, WinTree),
7820                 ("weight", 2, urwid.Pile([
7821                     urwid.LineBox(WinInfo),
7822                     urwid.LineBox(WinHexdump),
7823                 ])),
7824             ]),
7825             header=urwid.Columns([
7826                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7827                 ("weight", 1, WinInfoBar),
7828                 ("weight", 1, WinHexdumpBar),
7829             ]),
7830             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7831         ),
7832         [
7833             ("header", "bold", ""),
7834             ("constructed", "bold", ""),
7835             ("help", "light magenta", ""),
7836             ("warning", "light red", ""),
7837             ("defby", "light red", ""),
7838             ("eoc", "dark red", ""),
7839             ("select-value", "light green", ""),
7840             ("select-expl", "light red", ""),
7841             ("pg-normal", "", "light blue"),
7842             ("pg-complete", "black", "yellow"),
7843         ],
7844     ).run()
7845
7846
7847 def main():  # pragma: no cover
7848     import argparse
7849     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7850     parser.add_argument(
7851         "--skip",
7852         type=int,
7853         default=0,
7854         help="Skip that number of bytes from the beginning",
7855     )
7856     parser.add_argument(
7857         "--oids",
7858         help="Python paths to dictionary with OIDs, comma separated",
7859     )
7860     parser.add_argument(
7861         "--schema",
7862         help="Python path to schema definition to use",
7863     )
7864     parser.add_argument(
7865         "--defines-by-path",
7866         help="Python path to decoder's defines_by_path",
7867     )
7868     parser.add_argument(
7869         "--nobered",
7870         action="store_true",
7871         help="Disallow BER encoding",
7872     )
7873     parser.add_argument(
7874         "--print-decode-path",
7875         action="store_true",
7876         help="Print decode paths",
7877     )
7878     parser.add_argument(
7879         "--decode-path-only",
7880         help="Print only specified decode path",
7881     )
7882     parser.add_argument(
7883         "--allow-expl-oob",
7884         action="store_true",
7885         help="Allow explicit tag out-of-bound",
7886     )
7887     parser.add_argument(
7888         "--evgen",
7889         action="store_true",
7890         help="Turn on event generation mode",
7891     )
7892     parser.add_argument(
7893         "--browse",
7894         action="store_true",
7895         help="Start ASN.1 browser",
7896     )
7897     parser.add_argument(
7898         "RAWFile",
7899         type=argparse.FileType("rb"),
7900         help="Path to BER/CER/DER file you want to decode",
7901     )
7902     args = parser.parse_args()
7903     try:
7904         raw = file_mmaped(args.RAWFile)[args.skip:]
7905     except:
7906         args.RAWFile.seek(args.skip)
7907         raw = memoryview(args.RAWFile.read())
7908         args.RAWFile.close()
7909     oid_maps = (
7910         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7911         if args.oids else ()
7912     )
7913     from functools import partial
7914     if args.schema:
7915         schema = obj_by_path(args.schema)
7916         pprinter = partial(pprint, big_blobs=True)
7917     else:
7918         schema, pprinter = generic_decoder()
7919     ctx = {
7920         "bered": not args.nobered,
7921         "allow_expl_oob": args.allow_expl_oob,
7922     }
7923     if args.defines_by_path is not None:
7924         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7925     if args.browse:
7926         obj, _ = schema().decode(raw, ctx=ctx)
7927         browse(raw, obj, oid_maps)
7928         from sys import exit as sys_exit
7929         sys_exit(0)
7930     from os import environ
7931     pprinter = partial(
7932         pprinter,
7933         oid_maps=oid_maps,
7934         with_colours=environ.get("NO_COLOR") is None,
7935         with_decode_path=args.print_decode_path,
7936         decode_path_only=(
7937             () if args.decode_path_only is None else
7938             tuple(args.decode_path_only.split(":"))
7939         ),
7940     )
7941     if args.evgen:
7942         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7943             print(pprinter(obj, decode_path=decode_path))
7944     else:
7945         obj, tail = schema().decode(raw, ctx=ctx)
7946         print(pprinter(obj))
7947     if tail != b"":
7948         print("\nTrailing data: %s" % hexenc(tail))
7949
7950
7951 if __name__ == "__main__":
7952     from pyderasn import *
7953     main()