]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Long tag form must not contain zero byte
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 .. _browser:
905
906 ASN.1 browser
907 -------------
908 .. autofunction:: pyderasn.browse
909
910 Base Obj
911 --------
912 .. autoclass:: pyderasn.Obj
913    :members:
914
915 Primitive types
916 ---------------
917
918 Boolean
919 _______
920 .. autoclass:: pyderasn.Boolean
921    :members: __init__
922
923 Integer
924 _______
925 .. autoclass:: pyderasn.Integer
926    :members: __init__, named, tohex
927
928 BitString
929 _________
930 .. autoclass:: pyderasn.BitString
931    :members: __init__, bit_len, named
932
933 OctetString
934 ___________
935 .. autoclass:: pyderasn.OctetString
936    :members: __init__
937
938 Null
939 ____
940 .. autoclass:: pyderasn.Null
941    :members: __init__
942
943 ObjectIdentifier
944 ________________
945 .. autoclass:: pyderasn.ObjectIdentifier
946    :members: __init__
947
948 Enumerated
949 __________
950 .. autoclass:: pyderasn.Enumerated
951
952 CommonString
953 ____________
954 .. autoclass:: pyderasn.CommonString
955
956 NumericString
957 _____________
958 .. autoclass:: pyderasn.NumericString
959
960 PrintableString
961 _______________
962 .. autoclass:: pyderasn.PrintableString
963    :members: __init__, allow_asterisk, allow_ampersand
964
965 IA5String
966 _________
967 .. autoclass:: pyderasn.IA5String
968
969 VisibleString
970 _____________
971 .. autoclass:: pyderasn.VisibleString
972
973 UTCTime
974 _______
975 .. autoclass:: pyderasn.UTCTime
976    :members: __init__, todatetime
977
978 GeneralizedTime
979 _______________
980 .. autoclass:: pyderasn.GeneralizedTime
981    :members: __init__, todatetime
982
983 Special types
984 -------------
985
986 Choice
987 ______
988 .. autoclass:: pyderasn.Choice
989    :members: __init__, choice, value
990
991 PrimitiveTypes
992 ______________
993 .. autoclass:: PrimitiveTypes
994
995 Any
996 ___
997 .. autoclass:: pyderasn.Any
998    :members: __init__
999
1000 Constructed types
1001 -----------------
1002
1003 Sequence
1004 ________
1005 .. autoclass:: pyderasn.Sequence
1006    :members: __init__
1007
1008 Set
1009 ___
1010 .. autoclass:: pyderasn.Set
1011    :members: __init__
1012
1013 SequenceOf
1014 __________
1015 .. autoclass:: pyderasn.SequenceOf
1016    :members: __init__
1017
1018 SetOf
1019 _____
1020 .. autoclass:: pyderasn.SetOf
1021    :members: __init__
1022
1023 Various
1024 -------
1025
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1041    :members: __init__
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1052
1053 .. _cmdline:
1054
1055 Command-line usage
1056 ------------------
1057
1058 You can decode DER/BER files using command line abilities::
1059
1060     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1061
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1065
1066     $ python -m pyderasn path/to/file
1067         0   [1,3,1604]  . >: SEQUENCE OF
1068         4   [1,3,1453]  . . >: SEQUENCE OF
1069         8   [0,0,   5]  . . . . >: [0] ANY
1070                         . . . . . A0:03:02:01:02
1071        13   [1,1,   3]  . . . . >: INTEGER 61595
1072        18   [1,1,  13]  . . . . >: SEQUENCE OF
1073        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074        31   [1,1,   0]  . . . . . . >: NULL
1075        33   [1,3, 274]  . . . . >: SEQUENCE OF
1076        37   [1,1,  11]  . . . . . . >: SET OF
1077        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1078        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1080     [...]
1081      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1082      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1084                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086                         . . . . . . . . . 61:2E:63:6F:6D:2F
1087      1461   [1,1,  13]  . . >: SEQUENCE OF
1088      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089      1474   [1,1,   0]  . . . . >: NULL
1090      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1091                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1093     [...]
1094
1095 Human readable OIDs
1096 ___________________
1097
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1100
1101     stroid2name = {
1102         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1104         [...]
1105         "2.5.4.10": "id-at-organizationName",
1106         "2.5.4.11": "id-at-organizationalUnitName",
1107     }
1108
1109 then you can pass it to pretty printer to see human readable OIDs::
1110
1111     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1112     [...]
1113        37   [1,1,  11]  . . . . . . >: SET OF
1114        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1115        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1117        50   [1,1,  18]  . . . . . . >: SET OF
1118        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1119        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121        70   [1,1,  18]  . . . . . . >: SET OF
1122        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1123        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1125     [...]
1126
1127 Decode paths
1128 ____________
1129
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1134
1135     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136        0    [1,3,1604]  Certificate SEQUENCE []
1137        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1143                          . . . . 05:00
1144       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150                          . . . . . . . 13:02:45:53
1151       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1152     [...]
1153
1154 Now you can print only the specified tree, for example signature algorithm::
1155
1156     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1158       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1160                          . . 05:00
1161 """
1162
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from mmap import mmap
1174 from mmap import PROT_READ
1175 from operator import attrgetter
1176 from string import ascii_letters
1177 from string import digits
1178 from sys import maxsize as sys_maxsize
1179 from sys import version_info
1180 from unicodedata import category as unicat
1181
1182 from six import add_metaclass
1183 from six import binary_type
1184 from six import byte2int
1185 from six import indexbytes
1186 from six import int2byte
1187 from six import integer_types
1188 from six import iterbytes
1189 from six import iteritems
1190 from six import itervalues
1191 from six import PY2
1192 from six import string_types
1193 from six import text_type
1194 from six import unichr as six_unichr
1195 from six.moves import xrange as six_xrange
1196
1197
1198 try:
1199     from termcolor import colored
1200 except ImportError:  # pragma: no cover
1201     def colored(what, *args, **kwargs):
1202         return what
1203
1204 __version__ = "7.7"
1205
1206 __all__ = (
1207     "agg_octet_string",
1208     "Any",
1209     "BitString",
1210     "BMPString",
1211     "Boolean",
1212     "BoundsError",
1213     "Choice",
1214     "colonize_hex",
1215     "DecodeError",
1216     "DecodePathDefBy",
1217     "encode2pass",
1218     "encode_cer",
1219     "Enumerated",
1220     "ExceedingData",
1221     "file_mmaped",
1222     "GeneralizedTime",
1223     "GeneralString",
1224     "GraphicString",
1225     "hexdec",
1226     "hexenc",
1227     "IA5String",
1228     "Integer",
1229     "InvalidLength",
1230     "InvalidOID",
1231     "InvalidValueType",
1232     "ISO646String",
1233     "LenIndefForm",
1234     "NotEnoughData",
1235     "Null",
1236     "NumericString",
1237     "obj_by_path",
1238     "ObjectIdentifier",
1239     "ObjNotReady",
1240     "ObjUnknown",
1241     "OctetString",
1242     "PrimitiveTypes",
1243     "PrintableString",
1244     "Sequence",
1245     "SequenceOf",
1246     "Set",
1247     "SetOf",
1248     "T61String",
1249     "tag_ctxc",
1250     "tag_ctxp",
1251     "tag_decode",
1252     "TagClassApplication",
1253     "TagClassContext",
1254     "TagClassPrivate",
1255     "TagClassUniversal",
1256     "TagFormConstructed",
1257     "TagFormPrimitive",
1258     "TagMismatch",
1259     "TeletexString",
1260     "UniversalString",
1261     "UTCTime",
1262     "UTF8String",
1263     "VideotexString",
1264     "VisibleString",
1265 )
1266
1267 TagClassUniversal = 0
1268 TagClassApplication = 1 << 6
1269 TagClassContext = 1 << 7
1270 TagClassPrivate = 1 << 6 | 1 << 7
1271 TagFormPrimitive = 0
1272 TagFormConstructed = 1 << 5
1273 TagClassReprs = {
1274     TagClassContext: "",
1275     TagClassApplication: "APPLICATION ",
1276     TagClassPrivate: "PRIVATE ",
1277     TagClassUniversal: "UNIV ",
1278 }
1279 EOC = b"\x00\x00"
1280 EOC_LEN = len(EOC)
1281 LENINDEF = b"\x80"  # length indefinite mark
1282 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1283 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1284 SET01 = frozenset("01")
1285 DECIMALS = frozenset(digits)
1286 DECIMAL_SIGNS = ".,"
1287 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1288
1289
1290 def file_mmaped(fd):
1291     """Make mmap-ed memoryview for reading from file
1292
1293     :param fd: file object
1294     :returns: memoryview over read-only mmap-ing of the whole file
1295     """
1296     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1297
1298
1299 def pureint(value):
1300     if not set(value) <= DECIMALS:
1301         raise ValueError("non-pure integer")
1302     return int(value)
1303
1304
1305 def fractions2float(fractions_raw):
1306     pureint(fractions_raw)
1307     return float("0." + fractions_raw)
1308
1309
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311     """Get define by decode path
1312     """
1313     for path, define in defines_by_path:
1314         if len(path) != len(sub_decode_path):
1315             continue
1316         for p1, p2 in zip(path, sub_decode_path):
1317             if (p1 is not any) and (p1 != p2):
1318                 break
1319         else:
1320             return define
1321
1322
1323 ########################################################################
1324 # Errors
1325 ########################################################################
1326
1327 class ASN1Error(ValueError):
1328     pass
1329
1330
1331 class DecodeError(ASN1Error):
1332     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1333         """
1334         :param str msg: reason of decode failing
1335         :param klass: optional exact DecodeError inherited class (like
1336                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337                       :py:exc:`InvalidLength`)
1338         :param decode_path: tuple of strings. It contains human
1339                             readable names of the fields through which
1340                             decoding process has passed
1341         :param int offset: binary offset where failure happened
1342         """
1343         super(DecodeError, self).__init__()
1344         self.msg = msg
1345         self.klass = klass
1346         self.decode_path = decode_path
1347         self.offset = offset
1348
1349     def __str__(self):
1350         return " ".join(
1351             c for c in (
1352                 "" if self.klass is None else self.klass.__name__,
1353                 (
1354                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355                     if len(self.decode_path) > 0 else ""
1356                 ),
1357                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1358                 self.msg,
1359             ) if c != ""
1360         )
1361
1362     def __repr__(self):
1363         return "%s(%s)" % (self.__class__.__name__, self)
1364
1365
1366 class NotEnoughData(DecodeError):
1367     pass
1368
1369
1370 class ExceedingData(ASN1Error):
1371     def __init__(self, nbytes):
1372         super(ExceedingData, self).__init__()
1373         self.nbytes = nbytes
1374
1375     def __str__(self):
1376         return "%d trailing bytes" % self.nbytes
1377
1378     def __repr__(self):
1379         return "%s(%s)" % (self.__class__.__name__, self)
1380
1381
1382 class LenIndefForm(DecodeError):
1383     pass
1384
1385
1386 class TagMismatch(DecodeError):
1387     pass
1388
1389
1390 class InvalidLength(DecodeError):
1391     pass
1392
1393
1394 class InvalidOID(DecodeError):
1395     pass
1396
1397
1398 class ObjUnknown(ASN1Error):
1399     def __init__(self, name):
1400         super(ObjUnknown, self).__init__()
1401         self.name = name
1402
1403     def __str__(self):
1404         return "object is unknown: %s" % self.name
1405
1406     def __repr__(self):
1407         return "%s(%s)" % (self.__class__.__name__, self)
1408
1409
1410 class ObjNotReady(ASN1Error):
1411     def __init__(self, name):
1412         super(ObjNotReady, self).__init__()
1413         self.name = name
1414
1415     def __str__(self):
1416         return "object is not ready: %s" % self.name
1417
1418     def __repr__(self):
1419         return "%s(%s)" % (self.__class__.__name__, self)
1420
1421
1422 class InvalidValueType(ASN1Error):
1423     def __init__(self, expected_types):
1424         super(InvalidValueType, self).__init__()
1425         self.expected_types = expected_types
1426
1427     def __str__(self):
1428         return "invalid value type, expected: %s" % ", ".join(
1429             [repr(t) for t in self.expected_types]
1430         )
1431
1432     def __repr__(self):
1433         return "%s(%s)" % (self.__class__.__name__, self)
1434
1435
1436 class BoundsError(ASN1Error):
1437     def __init__(self, bound_min, value, bound_max):
1438         super(BoundsError, self).__init__()
1439         self.bound_min = bound_min
1440         self.value = value
1441         self.bound_max = bound_max
1442
1443     def __str__(self):
1444         return "unsatisfied bounds: %s <= %s <= %s" % (
1445             self.bound_min,
1446             self.value,
1447             self.bound_max,
1448         )
1449
1450     def __repr__(self):
1451         return "%s(%s)" % (self.__class__.__name__, self)
1452
1453
1454 ########################################################################
1455 # Basic coders
1456 ########################################################################
1457
1458 _hexdecoder = getdecoder("hex")
1459 _hexencoder = getencoder("hex")
1460
1461
1462 def hexdec(data):
1463     """Binary data to hexadecimal string convert
1464     """
1465     return _hexdecoder(data)[0]
1466
1467
1468 def hexenc(data):
1469     """Hexadecimal string to binary data convert
1470     """
1471     return _hexencoder(data)[0].decode("ascii")
1472
1473
1474 def int_bytes_len(num, byte_len=8):
1475     if num == 0:
1476         return 1
1477     return int(ceil(float(num.bit_length()) / byte_len))
1478
1479
1480 def zero_ended_encode(num):
1481     octets = bytearray(int_bytes_len(num, 7))
1482     i = len(octets) - 1
1483     octets[i] = num & 0x7F
1484     num >>= 7
1485     i -= 1
1486     while num > 0:
1487         octets[i] = 0x80 | (num & 0x7F)
1488         num >>= 7
1489         i -= 1
1490     return bytes(octets)
1491
1492
1493 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1494     """Encode tag to binary form
1495
1496     :param int num: tag's number
1497     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1498                       :py:data:`pyderasn.TagClassContext`,
1499                       :py:data:`pyderasn.TagClassApplication`,
1500                       :py:data:`pyderasn.TagClassPrivate`)
1501     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1502                      :py:data:`pyderasn.TagFormConstructed`)
1503     """
1504     if num < 31:
1505         # [XX|X|.....]
1506         return int2byte(klass | form | num)
1507     # [XX|X|11111][1.......][1.......] ... [0.......]
1508     return int2byte(klass | form | 31) + zero_ended_encode(num)
1509
1510
1511 def tag_decode(tag):
1512     """Decode tag from binary form
1513
1514     .. warning::
1515
1516        No validation is performed, assuming that it has already passed.
1517
1518     It returns tuple with three integers, as
1519     :py:func:`pyderasn.tag_encode` accepts.
1520     """
1521     first_octet = byte2int(tag)
1522     klass = first_octet & 0xC0
1523     form = first_octet & 0x20
1524     if first_octet & 0x1F < 0x1F:
1525         return (klass, form, first_octet & 0x1F)
1526     num = 0
1527     for octet in iterbytes(tag[1:]):
1528         num <<= 7
1529         num |= octet & 0x7F
1530     return (klass, form, num)
1531
1532
1533 def tag_ctxp(num):
1534     """Create CONTEXT PRIMITIVE tag
1535     """
1536     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1537
1538
1539 def tag_ctxc(num):
1540     """Create CONTEXT CONSTRUCTED tag
1541     """
1542     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1543
1544
1545 def tag_strip(data):
1546     """Take off tag from the data
1547
1548     :returns: (encoded tag, tag length, remaining data)
1549     """
1550     if len(data) == 0:
1551         raise NotEnoughData("no data at all")
1552     if byte2int(data) & 0x1F < 31:
1553         return data[:1], 1, data[1:]
1554     i = 0
1555     while True:
1556         i += 1
1557         if i == len(data):
1558             raise DecodeError("unfinished tag")
1559         if indexbytes(data, i) & 0x80 == 0:
1560             break
1561     if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1562         raise DecodeError("leading zero byte in tag value")
1563     i += 1
1564     return data[:i], i, data[i:]
1565
1566
1567 def len_encode(l):
1568     if l < 0x80:
1569         return int2byte(l)
1570     octets = bytearray(int_bytes_len(l) + 1)
1571     octets[0] = 0x80 | (len(octets) - 1)
1572     for i in six_xrange(len(octets) - 1, 0, -1):
1573         octets[i] = l & 0xFF
1574         l >>= 8
1575     return bytes(octets)
1576
1577
1578 def len_decode(data):
1579     """Decode length
1580
1581     :returns: (decoded length, length's length, remaining data)
1582     :raises LenIndefForm: if indefinite form encoding is met
1583     """
1584     if len(data) == 0:
1585         raise NotEnoughData("no data at all")
1586     first_octet = byte2int(data)
1587     if first_octet & 0x80 == 0:
1588         return first_octet, 1, data[1:]
1589     octets_num = first_octet & 0x7F
1590     if octets_num + 1 > len(data):
1591         raise NotEnoughData("encoded length is longer than data")
1592     if octets_num == 0:
1593         raise LenIndefForm()
1594     if byte2int(data[1:]) == 0:
1595         raise DecodeError("leading zeros")
1596     l = 0
1597     for v in iterbytes(data[1:1 + octets_num]):
1598         l = (l << 8) | v
1599     if l <= 127:
1600         raise DecodeError("long form instead of short one")
1601     return l, 1 + octets_num, data[1 + octets_num:]
1602
1603
1604 LEN0 = len_encode(0)
1605 LEN1 = len_encode(1)
1606 LEN1K = len_encode(1000)
1607
1608
1609 def len_size(l):
1610     """How many bytes length field will take
1611     """
1612     if l < 128:
1613         return 1
1614     if l < 256:  # 1 << 8
1615         return 2
1616     if l < 65536:  # 1 << 16
1617         return 3
1618     if l < 16777216:  # 1 << 24
1619         return 4
1620     if l < 4294967296:  # 1 << 32
1621         return 5
1622     if l < 1099511627776:  # 1 << 40
1623         return 6
1624     if l < 281474976710656:  # 1 << 48
1625         return 7
1626     if l < 72057594037927936:  # 1 << 56
1627         return 8
1628     raise OverflowError("too big length")
1629
1630
1631 def write_full(writer, data):
1632     """Fully write provided data
1633
1634     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1635
1636     BytesIO does not guarantee that the whole data will be written at
1637     once. That function write everything provided, raising an error if
1638     ``writer`` returns None.
1639     """
1640     data = memoryview(data)
1641     written = 0
1642     while written != len(data):
1643         n = writer(data[written:])
1644         if n is None:
1645             raise ValueError("can not write to buf")
1646         written += n
1647
1648
1649 # If it is 64-bit system, then use compact 64-bit array of unsigned
1650 # longs. Use an ordinary list with universal integers otherwise, that
1651 # is slower.
1652 if sys_maxsize > 2 ** 32:
1653     def state_2pass_new():
1654         return array("L")
1655 else:
1656     def state_2pass_new():
1657         return []
1658
1659
1660 ########################################################################
1661 # Base class
1662 ########################################################################
1663
1664 class AutoAddSlots(type):
1665     def __new__(cls, name, bases, _dict):
1666         _dict["__slots__"] = _dict.get("__slots__", ())
1667         return type.__new__(cls, name, bases, _dict)
1668
1669
1670 BasicState = namedtuple("BasicState", (
1671     "version",
1672     "tag",
1673     "tag_order",
1674     "expl",
1675     "default",
1676     "optional",
1677     "offset",
1678     "llen",
1679     "vlen",
1680     "expl_lenindef",
1681     "lenindef",
1682     "ber_encoded",
1683 ), **NAMEDTUPLE_KWARGS)
1684
1685
1686 @add_metaclass(AutoAddSlots)
1687 class Obj(object):
1688     """Common ASN.1 object class
1689
1690     All ASN.1 types are inherited from it. It has metaclass that
1691     automatically adds ``__slots__`` to all inherited classes.
1692     """
1693     __slots__ = (
1694         "tag",
1695         "_tag_order",
1696         "_value",
1697         "_expl",
1698         "default",
1699         "optional",
1700         "offset",
1701         "llen",
1702         "vlen",
1703         "expl_lenindef",
1704         "lenindef",
1705         "ber_encoded",
1706     )
1707
1708     def __init__(
1709             self,
1710             impl=None,
1711             expl=None,
1712             default=None,
1713             optional=False,
1714             _decoded=(0, 0, 0),
1715     ):
1716         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1717         self._expl = getattr(self, "expl", None) if expl is None else expl
1718         if self.tag != self.tag_default and self._expl is not None:
1719             raise ValueError("implicit and explicit tags can not be set simultaneously")
1720         if self.tag is None:
1721             self._tag_order = None
1722         else:
1723             tag_class, _, tag_num = tag_decode(
1724                 self.tag if self._expl is None else self._expl
1725             )
1726             self._tag_order = (tag_class, tag_num)
1727         if default is not None:
1728             optional = True
1729         self.optional = optional
1730         self.offset, self.llen, self.vlen = _decoded
1731         self.default = None
1732         self.expl_lenindef = False
1733         self.lenindef = False
1734         self.ber_encoded = False
1735
1736     @property
1737     def ready(self):  # pragma: no cover
1738         """Is object ready to be encoded?
1739         """
1740         raise NotImplementedError()
1741
1742     def _assert_ready(self):
1743         if not self.ready:
1744             raise ObjNotReady(self.__class__.__name__)
1745
1746     @property
1747     def bered(self):
1748         """Is either object or any elements inside is BER encoded?
1749         """
1750         return self.expl_lenindef or self.lenindef or self.ber_encoded
1751
1752     @property
1753     def decoded(self):
1754         """Is object decoded?
1755         """
1756         return (self.llen + self.vlen) > 0
1757
1758     def __getstate__(self):  # pragma: no cover
1759         """Used for making safe to be mutable pickleable copies
1760         """
1761         raise NotImplementedError()
1762
1763     def __setstate__(self, state):
1764         if state.version != __version__:
1765             raise ValueError("data is pickled by different PyDERASN version")
1766         self.tag = state.tag
1767         self._tag_order = state.tag_order
1768         self._expl = state.expl
1769         self.default = state.default
1770         self.optional = state.optional
1771         self.offset = state.offset
1772         self.llen = state.llen
1773         self.vlen = state.vlen
1774         self.expl_lenindef = state.expl_lenindef
1775         self.lenindef = state.lenindef
1776         self.ber_encoded = state.ber_encoded
1777
1778     @property
1779     def tag_order(self):
1780         """Tag's (class, number) used for DER/CER sorting
1781         """
1782         return self._tag_order
1783
1784     @property
1785     def tag_order_cer(self):
1786         return self.tag_order
1787
1788     @property
1789     def tlen(self):
1790         """.. seealso:: :ref:`decoding`
1791         """
1792         return len(self.tag)
1793
1794     @property
1795     def tlvlen(self):
1796         """.. seealso:: :ref:`decoding`
1797         """
1798         return self.tlen + self.llen + self.vlen
1799
1800     def __str__(self):  # pragma: no cover
1801         return self.__bytes__() if PY2 else self.__unicode__()
1802
1803     def __ne__(self, their):
1804         return not(self == their)
1805
1806     def __gt__(self, their):  # pragma: no cover
1807         return not(self < their)
1808
1809     def __le__(self, their):  # pragma: no cover
1810         return (self == their) or (self < their)
1811
1812     def __ge__(self, their):  # pragma: no cover
1813         return (self == their) or (self > their)
1814
1815     def _encode(self):  # pragma: no cover
1816         raise NotImplementedError()
1817
1818     def _encode_cer(self, writer):
1819         write_full(writer, self._encode())
1820
1821     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1822         yield NotImplemented
1823
1824     def _encode1st(self, state):
1825         raise NotImplementedError()
1826
1827     def _encode2nd(self, writer, state_iter):
1828         raise NotImplementedError()
1829
1830     def encode(self):
1831         """DER encode the structure
1832
1833         :returns: DER representation
1834         """
1835         raw = self._encode()
1836         if self._expl is None:
1837             return raw
1838         return b"".join((self._expl, len_encode(len(raw)), raw))
1839
1840     def encode1st(self, state=None):
1841         """Do the 1st pass of 2-pass encoding
1842
1843         :rtype: (int, array("L"))
1844         :returns: full length of encoded data and precalculated various
1845                   objects lengths
1846         """
1847         if state is None:
1848             state = state_2pass_new()
1849         if self._expl is None:
1850             return self._encode1st(state)
1851         state.append(0)
1852         idx = len(state) - 1
1853         vlen, _ = self._encode1st(state)
1854         state[idx] = vlen
1855         fulllen = len(self._expl) + len_size(vlen) + vlen
1856         return fulllen, state
1857
1858     def encode2nd(self, writer, state_iter):
1859         """Do the 2nd pass of 2-pass encoding
1860
1861         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1862         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1863         """
1864         if self._expl is None:
1865             self._encode2nd(writer, state_iter)
1866         else:
1867             write_full(writer, self._expl + len_encode(next(state_iter)))
1868             self._encode2nd(writer, state_iter)
1869
1870     def encode_cer(self, writer):
1871         """CER encode the structure to specified writer
1872
1873         :param writer: must comply with ``io.RawIOBase.write``
1874                        behaviour. It takes slice to be written and
1875                        returns number of bytes processed. If it returns
1876                        None, then exception will be raised
1877         """
1878         if self._expl is not None:
1879             write_full(writer, self._expl + LENINDEF)
1880         if getattr(self, "der_forced", False):
1881             write_full(writer, self._encode())
1882         else:
1883             self._encode_cer(writer)
1884         if self._expl is not None:
1885             write_full(writer, EOC)
1886
1887     def hexencode(self):
1888         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1889         """
1890         return hexenc(self.encode())
1891
1892     def decode(
1893             self,
1894             data,
1895             offset=0,
1896             leavemm=False,
1897             decode_path=(),
1898             ctx=None,
1899             tag_only=False,
1900             _ctx_immutable=True,
1901     ):
1902         """Decode the data
1903
1904         :param data: either binary or memoryview
1905         :param int offset: initial data's offset
1906         :param bool leavemm: do we need to leave memoryview of remaining
1907                     data as is, or convert it to bytes otherwise
1908         :param decode_path: current decode path (tuples of strings,
1909                             possibly with DecodePathDefBy) with will be
1910                             the root for all underlying objects
1911         :param ctx: optional :ref:`context <ctx>` governing decoding process
1912         :param bool tag_only: decode only the tag, without length and
1913                               contents (used only in Choice and Set
1914                               structures, trying to determine if tag satisfies
1915                               the schema)
1916         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1917                                     before using it?
1918         :returns: (Obj, remaining data)
1919
1920         .. seealso:: :ref:`decoding`
1921         """
1922         result = next(self.decode_evgen(
1923             data,
1924             offset,
1925             leavemm,
1926             decode_path,
1927             ctx,
1928             tag_only,
1929             _ctx_immutable,
1930             _evgen_mode=False,
1931         ))
1932         if result is None:
1933             return None
1934         _, obj, tail = result
1935         return obj, tail
1936
1937     def decode_evgen(
1938             self,
1939             data,
1940             offset=0,
1941             leavemm=False,
1942             decode_path=(),
1943             ctx=None,
1944             tag_only=False,
1945             _ctx_immutable=True,
1946             _evgen_mode=True,
1947     ):
1948         """Decode with evgen mode on
1949
1950         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1951         it returns the generator producing ``(decode_path, obj, tail)``
1952         values.
1953         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1954         """
1955         if ctx is None:
1956             ctx = {}
1957         elif _ctx_immutable:
1958             ctx = copy(ctx)
1959         tlv = memoryview(data)
1960         if (
1961                 _evgen_mode and
1962                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1963         ):
1964             _evgen_mode = False
1965         if self._expl is None:
1966             for result in self._decode(
1967                     tlv,
1968                     offset=offset,
1969                     decode_path=decode_path,
1970                     ctx=ctx,
1971                     tag_only=tag_only,
1972                     evgen_mode=_evgen_mode,
1973             ):
1974                 if tag_only:
1975                     yield None
1976                     return
1977                 _decode_path, obj, tail = result
1978                 if _decode_path is not decode_path:
1979                     yield result
1980         else:
1981             try:
1982                 t, tlen, lv = tag_strip(tlv)
1983             except DecodeError as err:
1984                 raise err.__class__(
1985                     msg=err.msg,
1986                     klass=self.__class__,
1987                     decode_path=decode_path,
1988                     offset=offset,
1989                 )
1990             if t != self._expl:
1991                 raise TagMismatch(
1992                     klass=self.__class__,
1993                     decode_path=decode_path,
1994                     offset=offset,
1995                 )
1996             try:
1997                 l, llen, v = len_decode(lv)
1998             except LenIndefForm as err:
1999                 if not ctx.get("bered", False):
2000                     raise err.__class__(
2001                         msg=err.msg,
2002                         klass=self.__class__,
2003                         decode_path=decode_path,
2004                         offset=offset,
2005                     )
2006                 llen, v = 1, lv[1:]
2007                 offset += tlen + llen
2008                 for result in self._decode(
2009                         v,
2010                         offset=offset,
2011                         decode_path=decode_path,
2012                         ctx=ctx,
2013                         tag_only=tag_only,
2014                         evgen_mode=_evgen_mode,
2015                 ):
2016                     if tag_only:  # pragma: no cover
2017                         yield None
2018                         return
2019                     _decode_path, obj, tail = result
2020                     if _decode_path is not decode_path:
2021                         yield result
2022                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2023                 if eoc_expected.tobytes() != EOC:
2024                     raise DecodeError(
2025                         "no EOC",
2026                         klass=self.__class__,
2027                         decode_path=decode_path,
2028                         offset=offset,
2029                     )
2030                 obj.vlen += EOC_LEN
2031                 obj.expl_lenindef = True
2032             except DecodeError as err:
2033                 raise err.__class__(
2034                     msg=err.msg,
2035                     klass=self.__class__,
2036                     decode_path=decode_path,
2037                     offset=offset,
2038                 )
2039             else:
2040                 if l > len(v):
2041                     raise NotEnoughData(
2042                         "encoded length is longer than data",
2043                         klass=self.__class__,
2044                         decode_path=decode_path,
2045                         offset=offset,
2046                     )
2047                 for result in self._decode(
2048                         v,
2049                         offset=offset + tlen + llen,
2050                         decode_path=decode_path,
2051                         ctx=ctx,
2052                         tag_only=tag_only,
2053                         evgen_mode=_evgen_mode,
2054                 ):
2055                     if tag_only:  # pragma: no cover
2056                         yield None
2057                         return
2058                     _decode_path, obj, tail = result
2059                     if _decode_path is not decode_path:
2060                         yield result
2061                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2062                     raise DecodeError(
2063                         "explicit tag out-of-bound, longer than data",
2064                         klass=self.__class__,
2065                         decode_path=decode_path,
2066                         offset=offset,
2067                     )
2068         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2069
2070     def decod(self, data, offset=0, decode_path=(), ctx=None):
2071         """Decode the data, check that tail is empty
2072
2073         :raises ExceedingData: if tail is not empty
2074
2075         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2076         (decode without tail) that also checks that there is no
2077         trailing data left.
2078         """
2079         obj, tail = self.decode(
2080             data,
2081             offset=offset,
2082             decode_path=decode_path,
2083             ctx=ctx,
2084             leavemm=True,
2085         )
2086         if len(tail) > 0:
2087             raise ExceedingData(len(tail))
2088         return obj
2089
2090     def hexdecode(self, data, *args, **kwargs):
2091         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2092         """
2093         return self.decode(hexdec(data), *args, **kwargs)
2094
2095     def hexdecod(self, data, *args, **kwargs):
2096         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2097         """
2098         return self.decod(hexdec(data), *args, **kwargs)
2099
2100     @property
2101     def expled(self):
2102         """.. seealso:: :ref:`decoding`
2103         """
2104         return self._expl is not None
2105
2106     @property
2107     def expl_tag(self):
2108         """.. seealso:: :ref:`decoding`
2109         """
2110         return self._expl
2111
2112     @property
2113     def expl_tlen(self):
2114         """.. seealso:: :ref:`decoding`
2115         """
2116         return len(self._expl)
2117
2118     @property
2119     def expl_llen(self):
2120         """.. seealso:: :ref:`decoding`
2121         """
2122         if self.expl_lenindef:
2123             return 1
2124         return len(len_encode(self.tlvlen))
2125
2126     @property
2127     def expl_offset(self):
2128         """.. seealso:: :ref:`decoding`
2129         """
2130         return self.offset - self.expl_tlen - self.expl_llen
2131
2132     @property
2133     def expl_vlen(self):
2134         """.. seealso:: :ref:`decoding`
2135         """
2136         return self.tlvlen
2137
2138     @property
2139     def expl_tlvlen(self):
2140         """.. seealso:: :ref:`decoding`
2141         """
2142         return self.expl_tlen + self.expl_llen + self.expl_vlen
2143
2144     @property
2145     def fulloffset(self):
2146         """.. seealso:: :ref:`decoding`
2147         """
2148         return self.expl_offset if self.expled else self.offset
2149
2150     @property
2151     def fulllen(self):
2152         """.. seealso:: :ref:`decoding`
2153         """
2154         return self.expl_tlvlen if self.expled else self.tlvlen
2155
2156     def pps_lenindef(self, decode_path):
2157         if self.lenindef and not (
2158                 getattr(self, "defined", None) is not None and
2159                 self.defined[1].lenindef
2160         ):
2161             yield _pp(
2162                 asn1_type_name="EOC",
2163                 obj_name="",
2164                 decode_path=decode_path,
2165                 offset=(
2166                     self.offset + self.tlvlen -
2167                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2168                 ),
2169                 tlen=1,
2170                 llen=1,
2171                 vlen=0,
2172                 ber_encoded=True,
2173                 bered=True,
2174             )
2175         if self.expl_lenindef:
2176             yield _pp(
2177                 asn1_type_name="EOC",
2178                 obj_name="EXPLICIT",
2179                 decode_path=decode_path,
2180                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2181                 tlen=1,
2182                 llen=1,
2183                 vlen=0,
2184                 ber_encoded=True,
2185                 bered=True,
2186             )
2187
2188
2189 def encode_cer(obj):
2190     """Encode to CER in memory buffer
2191
2192     :returns bytes: memory buffer contents
2193     """
2194     buf = BytesIO()
2195     obj.encode_cer(buf.write)
2196     return buf.getvalue()
2197
2198
2199 def encode2pass(obj):
2200     """Encode (2-pass mode) to DER in memory buffer
2201
2202     :returns bytes: memory buffer contents
2203     """
2204     buf = BytesIO()
2205     _, state = obj.encode1st()
2206     obj.encode2nd(buf.write, iter(state))
2207     return buf.getvalue()
2208
2209
2210 class DecodePathDefBy(object):
2211     """DEFINED BY representation inside decode path
2212     """
2213     __slots__ = ("defined_by",)
2214
2215     def __init__(self, defined_by):
2216         self.defined_by = defined_by
2217
2218     def __ne__(self, their):
2219         return not(self == their)
2220
2221     def __eq__(self, their):
2222         if not isinstance(their, self.__class__):
2223             return False
2224         return self.defined_by == their.defined_by
2225
2226     def __str__(self):
2227         return "DEFINED BY " + str(self.defined_by)
2228
2229     def __repr__(self):
2230         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2231
2232
2233 ########################################################################
2234 # Pretty printing
2235 ########################################################################
2236
2237 PP = namedtuple("PP", (
2238     "obj",
2239     "asn1_type_name",
2240     "obj_name",
2241     "decode_path",
2242     "value",
2243     "blob",
2244     "optional",
2245     "default",
2246     "impl",
2247     "expl",
2248     "offset",
2249     "tlen",
2250     "llen",
2251     "vlen",
2252     "expl_offset",
2253     "expl_tlen",
2254     "expl_llen",
2255     "expl_vlen",
2256     "expl_lenindef",
2257     "lenindef",
2258     "ber_encoded",
2259     "bered",
2260 ), **NAMEDTUPLE_KWARGS)
2261
2262
2263 def _pp(
2264         obj=None,
2265         asn1_type_name="unknown",
2266         obj_name="unknown",
2267         decode_path=(),
2268         value=None,
2269         blob=None,
2270         optional=False,
2271         default=False,
2272         impl=None,
2273         expl=None,
2274         offset=0,
2275         tlen=0,
2276         llen=0,
2277         vlen=0,
2278         expl_offset=None,
2279         expl_tlen=None,
2280         expl_llen=None,
2281         expl_vlen=None,
2282         expl_lenindef=False,
2283         lenindef=False,
2284         ber_encoded=False,
2285         bered=False,
2286 ):
2287     return PP(
2288         obj,
2289         asn1_type_name,
2290         obj_name,
2291         decode_path,
2292         value,
2293         blob,
2294         optional,
2295         default,
2296         impl,
2297         expl,
2298         offset,
2299         tlen,
2300         llen,
2301         vlen,
2302         expl_offset,
2303         expl_tlen,
2304         expl_llen,
2305         expl_vlen,
2306         expl_lenindef,
2307         lenindef,
2308         ber_encoded,
2309         bered,
2310     )
2311
2312
2313 def _colourize(what, colour, with_colours, attrs=("bold",)):
2314     return colored(what, colour, attrs=attrs) if with_colours else what
2315
2316
2317 def colonize_hex(hexed):
2318     """Separate hexadecimal string with colons
2319     """
2320     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2321
2322
2323 def find_oid_name(asn1_type_name, oid_maps, value):
2324     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2325         for oid_map in oid_maps:
2326             oid_name = oid_map.get(value)
2327             if oid_name is not None:
2328                 return oid_name
2329     return None
2330
2331
2332 def pp_console_row(
2333         pp,
2334         oid_maps=(),
2335         with_offsets=False,
2336         with_blob=True,
2337         with_colours=False,
2338         with_decode_path=False,
2339         decode_path_len_decrease=0,
2340 ):
2341     cols = []
2342     if with_offsets:
2343         col = "%5d%s%s" % (
2344             pp.offset,
2345             (
2346                 "  " if pp.expl_offset is None else
2347                 ("-%d" % (pp.offset - pp.expl_offset))
2348             ),
2349             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2350         )
2351         col = _colourize(col, "red", with_colours, ())
2352         col += _colourize("B", "red", with_colours) if pp.bered else " "
2353         cols.append(col)
2354         col = "[%d,%d,%4d]%s" % (
2355             pp.tlen, pp.llen, pp.vlen,
2356             LENINDEF_PP_CHAR if pp.lenindef else " "
2357         )
2358         col = _colourize(col, "green", with_colours, ())
2359         cols.append(col)
2360     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2361     if decode_path_len > 0:
2362         cols.append(" ." * decode_path_len)
2363         ent = pp.decode_path[-1]
2364         if isinstance(ent, DecodePathDefBy):
2365             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2366             value = str(ent.defined_by)
2367             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2368             if oid_name is None:
2369                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2370             else:
2371                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2372         else:
2373             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2374     if pp.expl is not None:
2375         klass, _, num = pp.expl
2376         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2377         cols.append(_colourize(col, "blue", with_colours))
2378     if pp.impl is not None:
2379         klass, _, num = pp.impl
2380         col = "[%s%d]" % (TagClassReprs[klass], num)
2381         cols.append(_colourize(col, "blue", with_colours))
2382     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2383         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2384     if pp.ber_encoded:
2385         cols.append(_colourize("BER", "red", with_colours))
2386     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2387     if pp.value is not None:
2388         value = pp.value
2389         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2390         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2391         if oid_name is not None:
2392             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2393         if pp.asn1_type_name == Integer.asn1_type_name:
2394             cols.append(_colourize(
2395                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2396             ))
2397     if with_blob:
2398         if pp.blob.__class__ == binary_type:
2399             cols.append(hexenc(pp.blob))
2400         elif pp.blob.__class__ == tuple:
2401             cols.append(", ".join(pp.blob))
2402     if pp.optional:
2403         cols.append(_colourize("OPTIONAL", "red", with_colours))
2404     if pp.default:
2405         cols.append(_colourize("DEFAULT", "red", with_colours))
2406     if with_decode_path:
2407         cols.append(_colourize(
2408             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2409             "grey",
2410             with_colours,
2411         ))
2412     return " ".join(cols)
2413
2414
2415 def pp_console_blob(pp, decode_path_len_decrease=0):
2416     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2417     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2418     if decode_path_len > 0:
2419         cols.append(" ." * (decode_path_len + 1))
2420     if pp.blob.__class__ == binary_type:
2421         blob = hexenc(pp.blob).upper()
2422         for i in six_xrange(0, len(blob), 32):
2423             chunk = blob[i:i + 32]
2424             yield " ".join(cols + [colonize_hex(chunk)])
2425     elif pp.blob.__class__ == tuple:
2426         yield " ".join(cols + [", ".join(pp.blob)])
2427
2428
2429 def pprint(
2430         obj,
2431         oid_maps=(),
2432         big_blobs=False,
2433         with_colours=False,
2434         with_decode_path=False,
2435         decode_path_only=(),
2436         decode_path=(),
2437 ):
2438     """Pretty print object
2439
2440     :param Obj obj: object you want to pretty print
2441     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2442                      Its human readable form is printed when OID is met
2443     :param big_blobs: if large binary objects are met (like OctetString
2444                       values), do we need to print them too, on separate
2445                       lines
2446     :param with_colours: colourize output, if ``termcolor`` library
2447                          is available
2448     :param with_decode_path: print decode path
2449     :param decode_path_only: print only that specified decode path
2450     """
2451     def _pprint_pps(pps):
2452         for pp in pps:
2453             if hasattr(pp, "_fields"):
2454                 if (
2455                         decode_path_only != () and
2456                         tuple(
2457                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2458                         ) != decode_path_only
2459                 ):
2460                     continue
2461                 if big_blobs:
2462                     yield pp_console_row(
2463                         pp,
2464                         oid_maps=oid_maps,
2465                         with_offsets=True,
2466                         with_blob=False,
2467                         with_colours=with_colours,
2468                         with_decode_path=with_decode_path,
2469                         decode_path_len_decrease=len(decode_path_only),
2470                     )
2471                     for row in pp_console_blob(
2472                             pp,
2473                             decode_path_len_decrease=len(decode_path_only),
2474                     ):
2475                         yield row
2476                 else:
2477                     yield pp_console_row(
2478                         pp,
2479                         oid_maps=oid_maps,
2480                         with_offsets=True,
2481                         with_blob=True,
2482                         with_colours=with_colours,
2483                         with_decode_path=with_decode_path,
2484                         decode_path_len_decrease=len(decode_path_only),
2485                     )
2486             else:
2487                 for row in _pprint_pps(pp):
2488                     yield row
2489     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2490
2491
2492 ########################################################################
2493 # ASN.1 primitive types
2494 ########################################################################
2495
2496 BooleanState = namedtuple(
2497     "BooleanState",
2498     BasicState._fields + ("value",),
2499     **NAMEDTUPLE_KWARGS
2500 )
2501
2502
2503 class Boolean(Obj):
2504     """``BOOLEAN`` boolean type
2505
2506     >>> b = Boolean(True)
2507     BOOLEAN True
2508     >>> b == Boolean(True)
2509     True
2510     >>> bool(b)
2511     True
2512     """
2513     __slots__ = ()
2514     tag_default = tag_encode(1)
2515     asn1_type_name = "BOOLEAN"
2516
2517     def __init__(
2518             self,
2519             value=None,
2520             impl=None,
2521             expl=None,
2522             default=None,
2523             optional=False,
2524             _decoded=(0, 0, 0),
2525     ):
2526         """
2527         :param value: set the value. Either boolean type, or
2528                       :py:class:`pyderasn.Boolean` object
2529         :param bytes impl: override default tag with ``IMPLICIT`` one
2530         :param bytes expl: override default tag with ``EXPLICIT`` one
2531         :param default: set default value. Type same as in ``value``
2532         :param bool optional: is object ``OPTIONAL`` in sequence
2533         """
2534         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2535         self._value = None if value is None else self._value_sanitize(value)
2536         if default is not None:
2537             default = self._value_sanitize(default)
2538             self.default = self.__class__(
2539                 value=default,
2540                 impl=self.tag,
2541                 expl=self._expl,
2542             )
2543             if value is None:
2544                 self._value = default
2545
2546     def _value_sanitize(self, value):
2547         if value.__class__ == bool:
2548             return value
2549         if issubclass(value.__class__, Boolean):
2550             return value._value
2551         raise InvalidValueType((self.__class__, bool))
2552
2553     @property
2554     def ready(self):
2555         return self._value is not None
2556
2557     def __getstate__(self):
2558         return BooleanState(
2559             __version__,
2560             self.tag,
2561             self._tag_order,
2562             self._expl,
2563             self.default,
2564             self.optional,
2565             self.offset,
2566             self.llen,
2567             self.vlen,
2568             self.expl_lenindef,
2569             self.lenindef,
2570             self.ber_encoded,
2571             self._value,
2572         )
2573
2574     def __setstate__(self, state):
2575         super(Boolean, self).__setstate__(state)
2576         self._value = state.value
2577
2578     def __nonzero__(self):
2579         self._assert_ready()
2580         return self._value
2581
2582     def __bool__(self):
2583         self._assert_ready()
2584         return self._value
2585
2586     def __eq__(self, their):
2587         if their.__class__ == bool:
2588             return self._value == their
2589         if not issubclass(their.__class__, Boolean):
2590             return False
2591         return (
2592             self._value == their._value and
2593             self.tag == their.tag and
2594             self._expl == their._expl
2595         )
2596
2597     def __call__(
2598             self,
2599             value=None,
2600             impl=None,
2601             expl=None,
2602             default=None,
2603             optional=None,
2604     ):
2605         return self.__class__(
2606             value=value,
2607             impl=self.tag if impl is None else impl,
2608             expl=self._expl if expl is None else expl,
2609             default=self.default if default is None else default,
2610             optional=self.optional if optional is None else optional,
2611         )
2612
2613     def _encode(self):
2614         self._assert_ready()
2615         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2616
2617     def _encode1st(self, state):
2618         return len(self.tag) + 2, state
2619
2620     def _encode2nd(self, writer, state_iter):
2621         self._assert_ready()
2622         write_full(writer, self._encode())
2623
2624     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2625         try:
2626             t, _, lv = tag_strip(tlv)
2627         except DecodeError as err:
2628             raise err.__class__(
2629                 msg=err.msg,
2630                 klass=self.__class__,
2631                 decode_path=decode_path,
2632                 offset=offset,
2633             )
2634         if t != self.tag:
2635             raise TagMismatch(
2636                 klass=self.__class__,
2637                 decode_path=decode_path,
2638                 offset=offset,
2639             )
2640         if tag_only:
2641             yield None
2642             return
2643         try:
2644             l, _, v = len_decode(lv)
2645         except DecodeError as err:
2646             raise err.__class__(
2647                 msg=err.msg,
2648                 klass=self.__class__,
2649                 decode_path=decode_path,
2650                 offset=offset,
2651             )
2652         if l != 1:
2653             raise InvalidLength(
2654                 "Boolean's length must be equal to 1",
2655                 klass=self.__class__,
2656                 decode_path=decode_path,
2657                 offset=offset,
2658             )
2659         if l > len(v):
2660             raise NotEnoughData(
2661                 "encoded length is longer than data",
2662                 klass=self.__class__,
2663                 decode_path=decode_path,
2664                 offset=offset,
2665             )
2666         first_octet = byte2int(v)
2667         ber_encoded = False
2668         if first_octet == 0:
2669             value = False
2670         elif first_octet == 0xFF:
2671             value = True
2672         elif ctx.get("bered", False):
2673             value = True
2674             ber_encoded = True
2675         else:
2676             raise DecodeError(
2677                 "unacceptable Boolean value",
2678                 klass=self.__class__,
2679                 decode_path=decode_path,
2680                 offset=offset,
2681             )
2682         obj = self.__class__(
2683             value=value,
2684             impl=self.tag,
2685             expl=self._expl,
2686             default=self.default,
2687             optional=self.optional,
2688             _decoded=(offset, 1, 1),
2689         )
2690         obj.ber_encoded = ber_encoded
2691         yield decode_path, obj, v[1:]
2692
2693     def __repr__(self):
2694         return pp_console_row(next(self.pps()))
2695
2696     def pps(self, decode_path=()):
2697         yield _pp(
2698             obj=self,
2699             asn1_type_name=self.asn1_type_name,
2700             obj_name=self.__class__.__name__,
2701             decode_path=decode_path,
2702             value=str(self._value) if self.ready else None,
2703             optional=self.optional,
2704             default=self == self.default,
2705             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2706             expl=None if self._expl is None else tag_decode(self._expl),
2707             offset=self.offset,
2708             tlen=self.tlen,
2709             llen=self.llen,
2710             vlen=self.vlen,
2711             expl_offset=self.expl_offset if self.expled else None,
2712             expl_tlen=self.expl_tlen if self.expled else None,
2713             expl_llen=self.expl_llen if self.expled else None,
2714             expl_vlen=self.expl_vlen if self.expled else None,
2715             expl_lenindef=self.expl_lenindef,
2716             ber_encoded=self.ber_encoded,
2717             bered=self.bered,
2718         )
2719         for pp in self.pps_lenindef(decode_path):
2720             yield pp
2721
2722
2723 IntegerState = namedtuple(
2724     "IntegerState",
2725     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2726     **NAMEDTUPLE_KWARGS
2727 )
2728
2729
2730 class Integer(Obj):
2731     """``INTEGER`` integer type
2732
2733     >>> b = Integer(-123)
2734     INTEGER -123
2735     >>> b == Integer(-123)
2736     True
2737     >>> int(b)
2738     -123
2739
2740     >>> Integer(2, bounds=(1, 3))
2741     INTEGER 2
2742     >>> Integer(5, bounds=(1, 3))
2743     Traceback (most recent call last):
2744     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2745
2746     ::
2747
2748         class Version(Integer):
2749             schema = (
2750                 ("v1", 0),
2751                 ("v2", 1),
2752                 ("v3", 2),
2753             )
2754
2755     >>> v = Version("v1")
2756     Version INTEGER v1
2757     >>> int(v)
2758     0
2759     >>> v.named
2760     'v1'
2761     >>> v.specs
2762     {'v3': 2, 'v1': 0, 'v2': 1}
2763     """
2764     __slots__ = ("specs", "_bound_min", "_bound_max")
2765     tag_default = tag_encode(2)
2766     asn1_type_name = "INTEGER"
2767
2768     def __init__(
2769             self,
2770             value=None,
2771             bounds=None,
2772             impl=None,
2773             expl=None,
2774             default=None,
2775             optional=False,
2776             _specs=None,
2777             _decoded=(0, 0, 0),
2778     ):
2779         """
2780         :param value: set the value. Either integer type, named value
2781                       (if ``schema`` is specified in the class), or
2782                       :py:class:`pyderasn.Integer` object
2783         :param bounds: set ``(MIN, MAX)`` value constraint.
2784                        (-inf, +inf) by default
2785         :param bytes impl: override default tag with ``IMPLICIT`` one
2786         :param bytes expl: override default tag with ``EXPLICIT`` one
2787         :param default: set default value. Type same as in ``value``
2788         :param bool optional: is object ``OPTIONAL`` in sequence
2789         """
2790         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2791         self._value = value
2792         specs = getattr(self, "schema", {}) if _specs is None else _specs
2793         self.specs = specs if specs.__class__ == dict else dict(specs)
2794         self._bound_min, self._bound_max = getattr(
2795             self,
2796             "bounds",
2797             (float("-inf"), float("+inf")),
2798         ) if bounds is None else bounds
2799         if value is not None:
2800             self._value = self._value_sanitize(value)
2801         if default is not None:
2802             default = self._value_sanitize(default)
2803             self.default = self.__class__(
2804                 value=default,
2805                 impl=self.tag,
2806                 expl=self._expl,
2807                 _specs=self.specs,
2808             )
2809             if self._value is None:
2810                 self._value = default
2811
2812     def _value_sanitize(self, value):
2813         if isinstance(value, integer_types):
2814             pass
2815         elif issubclass(value.__class__, Integer):
2816             value = value._value
2817         elif value.__class__ == str:
2818             value = self.specs.get(value)
2819             if value is None:
2820                 raise ObjUnknown("integer value: %s" % value)
2821         else:
2822             raise InvalidValueType((self.__class__, int, str))
2823         if not self._bound_min <= value <= self._bound_max:
2824             raise BoundsError(self._bound_min, value, self._bound_max)
2825         return value
2826
2827     @property
2828     def ready(self):
2829         return self._value is not None
2830
2831     def __getstate__(self):
2832         return IntegerState(
2833             __version__,
2834             self.tag,
2835             self._tag_order,
2836             self._expl,
2837             self.default,
2838             self.optional,
2839             self.offset,
2840             self.llen,
2841             self.vlen,
2842             self.expl_lenindef,
2843             self.lenindef,
2844             self.ber_encoded,
2845             self.specs,
2846             self._value,
2847             self._bound_min,
2848             self._bound_max,
2849         )
2850
2851     def __setstate__(self, state):
2852         super(Integer, self).__setstate__(state)
2853         self.specs = state.specs
2854         self._value = state.value
2855         self._bound_min = state.bound_min
2856         self._bound_max = state.bound_max
2857
2858     def __int__(self):
2859         self._assert_ready()
2860         return int(self._value)
2861
2862     def tohex(self):
2863         """Hexadecimal representation
2864
2865         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2866         """
2867         hex_repr = hex(int(self))[2:].upper()
2868         if len(hex_repr) % 2 != 0:
2869             hex_repr = "0" + hex_repr
2870         return hex_repr
2871
2872     def __hash__(self):
2873         self._assert_ready()
2874         return hash(b"".join((
2875             self.tag,
2876             bytes(self._expl or b""),
2877             str(self._value).encode("ascii"),
2878         )))
2879
2880     def __eq__(self, their):
2881         if isinstance(their, integer_types):
2882             return self._value == their
2883         if not issubclass(their.__class__, Integer):
2884             return False
2885         return (
2886             self._value == their._value and
2887             self.tag == their.tag and
2888             self._expl == their._expl
2889         )
2890
2891     def __lt__(self, their):
2892         return self._value < their._value
2893
2894     @property
2895     def named(self):
2896         """Return named representation (if exists) of the value
2897         """
2898         for name, value in iteritems(self.specs):
2899             if value == self._value:
2900                 return name
2901         return None
2902
2903     def __call__(
2904             self,
2905             value=None,
2906             bounds=None,
2907             impl=None,
2908             expl=None,
2909             default=None,
2910             optional=None,
2911     ):
2912         return self.__class__(
2913             value=value,
2914             bounds=(
2915                 (self._bound_min, self._bound_max)
2916                 if bounds is None else bounds
2917             ),
2918             impl=self.tag if impl is None else impl,
2919             expl=self._expl if expl is None else expl,
2920             default=self.default if default is None else default,
2921             optional=self.optional if optional is None else optional,
2922             _specs=self.specs,
2923         )
2924
2925     def _encode_payload(self):
2926         self._assert_ready()
2927         value = self._value
2928         if PY2:
2929             if value == 0:
2930                 octets = bytearray([0])
2931             elif value < 0:
2932                 value = -value
2933                 value -= 1
2934                 octets = bytearray()
2935                 while value > 0:
2936                     octets.append((value & 0xFF) ^ 0xFF)
2937                     value >>= 8
2938                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2939                     octets.append(0xFF)
2940             else:
2941                 octets = bytearray()
2942                 while value > 0:
2943                     octets.append(value & 0xFF)
2944                     value >>= 8
2945                 if octets[-1] & 0x80 > 0:
2946                     octets.append(0x00)
2947             octets.reverse()
2948             octets = bytes(octets)
2949         else:
2950             bytes_len = ceil(value.bit_length() / 8) or 1
2951             while True:
2952                 try:
2953                     octets = value.to_bytes(
2954                         bytes_len,
2955                         byteorder="big",
2956                         signed=True,
2957                     )
2958                 except OverflowError:
2959                     bytes_len += 1
2960                 else:
2961                     break
2962         return octets
2963
2964     def _encode(self):
2965         octets = self._encode_payload()
2966         return b"".join((self.tag, len_encode(len(octets)), octets))
2967
2968     def _encode1st(self, state):
2969         l = len(self._encode_payload())
2970         return len(self.tag) + len_size(l) + l, state
2971
2972     def _encode2nd(self, writer, state_iter):
2973         write_full(writer, self._encode())
2974
2975     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2976         try:
2977             t, _, lv = tag_strip(tlv)
2978         except DecodeError as err:
2979             raise err.__class__(
2980                 msg=err.msg,
2981                 klass=self.__class__,
2982                 decode_path=decode_path,
2983                 offset=offset,
2984             )
2985         if t != self.tag:
2986             raise TagMismatch(
2987                 klass=self.__class__,
2988                 decode_path=decode_path,
2989                 offset=offset,
2990             )
2991         if tag_only:
2992             yield None
2993             return
2994         try:
2995             l, llen, v = len_decode(lv)
2996         except DecodeError as err:
2997             raise err.__class__(
2998                 msg=err.msg,
2999                 klass=self.__class__,
3000                 decode_path=decode_path,
3001                 offset=offset,
3002             )
3003         if l > len(v):
3004             raise NotEnoughData(
3005                 "encoded length is longer than data",
3006                 klass=self.__class__,
3007                 decode_path=decode_path,
3008                 offset=offset,
3009             )
3010         if l == 0:
3011             raise NotEnoughData(
3012                 "zero length",
3013                 klass=self.__class__,
3014                 decode_path=decode_path,
3015                 offset=offset,
3016             )
3017         v, tail = v[:l], v[l:]
3018         first_octet = byte2int(v)
3019         if l > 1:
3020             second_octet = byte2int(v[1:])
3021             if (
3022                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3023                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3024             ):
3025                 raise DecodeError(
3026                     "non normalized integer",
3027                     klass=self.__class__,
3028                     decode_path=decode_path,
3029                     offset=offset,
3030                 )
3031         if PY2:
3032             value = 0
3033             if first_octet & 0x80 > 0:
3034                 octets = bytearray()
3035                 for octet in bytearray(v):
3036                     octets.append(octet ^ 0xFF)
3037                 for octet in octets:
3038                     value = (value << 8) | octet
3039                 value += 1
3040                 value = -value
3041             else:
3042                 for octet in bytearray(v):
3043                     value = (value << 8) | octet
3044         else:
3045             value = int.from_bytes(v, byteorder="big", signed=True)
3046         try:
3047             obj = self.__class__(
3048                 value=value,
3049                 bounds=(self._bound_min, self._bound_max),
3050                 impl=self.tag,
3051                 expl=self._expl,
3052                 default=self.default,
3053                 optional=self.optional,
3054                 _specs=self.specs,
3055                 _decoded=(offset, llen, l),
3056             )
3057         except BoundsError as err:
3058             raise DecodeError(
3059                 msg=str(err),
3060                 klass=self.__class__,
3061                 decode_path=decode_path,
3062                 offset=offset,
3063             )
3064         yield decode_path, obj, tail
3065
3066     def __repr__(self):
3067         return pp_console_row(next(self.pps()))
3068
3069     def pps(self, decode_path=()):
3070         yield _pp(
3071             obj=self,
3072             asn1_type_name=self.asn1_type_name,
3073             obj_name=self.__class__.__name__,
3074             decode_path=decode_path,
3075             value=(self.named or str(self._value)) if self.ready else None,
3076             optional=self.optional,
3077             default=self == self.default,
3078             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3079             expl=None if self._expl is None else tag_decode(self._expl),
3080             offset=self.offset,
3081             tlen=self.tlen,
3082             llen=self.llen,
3083             vlen=self.vlen,
3084             expl_offset=self.expl_offset if self.expled else None,
3085             expl_tlen=self.expl_tlen if self.expled else None,
3086             expl_llen=self.expl_llen if self.expled else None,
3087             expl_vlen=self.expl_vlen if self.expled else None,
3088             expl_lenindef=self.expl_lenindef,
3089             bered=self.bered,
3090         )
3091         for pp in self.pps_lenindef(decode_path):
3092             yield pp
3093
3094
3095 BitStringState = namedtuple(
3096     "BitStringState",
3097     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3098     **NAMEDTUPLE_KWARGS
3099 )
3100
3101
3102 class BitString(Obj):
3103     """``BIT STRING`` bit string type
3104
3105     >>> BitString(b"hello world")
3106     BIT STRING 88 bits 68656c6c6f20776f726c64
3107     >>> bytes(b)
3108     b'hello world'
3109     >>> b == b"hello world"
3110     True
3111     >>> b.bit_len
3112     88
3113
3114     >>> BitString("'0A3B5F291CD'H")
3115     BIT STRING 44 bits 0a3b5f291cd0
3116     >>> b = BitString("'010110000000'B")
3117     BIT STRING 12 bits 5800
3118     >>> b.bit_len
3119     12
3120     >>> b[0], b[1], b[2], b[3]
3121     (False, True, False, True)
3122     >>> b[1000]
3123     False
3124     >>> [v for v in b]
3125     [False, True, False, True, True, False, False, False, False, False, False, False]
3126
3127     ::
3128
3129         class KeyUsage(BitString):
3130             schema = (
3131                 ("digitalSignature", 0),
3132                 ("nonRepudiation", 1),
3133                 ("keyEncipherment", 2),
3134             )
3135
3136     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3137     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3138     >>> b.named
3139     ['nonRepudiation', 'keyEncipherment']
3140     >>> b.specs
3141     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3142
3143     .. note::
3144
3145        Pay attention that BIT STRING can be encoded both in primitive
3146        and constructed forms. Decoder always checks constructed form tag
3147        additionally to specified primitive one. If BER decoding is
3148        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3149        of DER restrictions.
3150     """
3151     __slots__ = ("tag_constructed", "specs", "defined")
3152     tag_default = tag_encode(3)
3153     asn1_type_name = "BIT STRING"
3154
3155     def __init__(
3156             self,
3157             value=None,
3158             impl=None,
3159             expl=None,
3160             default=None,
3161             optional=False,
3162             _specs=None,
3163             _decoded=(0, 0, 0),
3164     ):
3165         """
3166         :param value: set the value. Either binary type, tuple of named
3167                       values (if ``schema`` is specified in the class),
3168                       string in ``'XXX...'B`` form, or
3169                       :py:class:`pyderasn.BitString` object
3170         :param bytes impl: override default tag with ``IMPLICIT`` one
3171         :param bytes expl: override default tag with ``EXPLICIT`` one
3172         :param default: set default value. Type same as in ``value``
3173         :param bool optional: is object ``OPTIONAL`` in sequence
3174         """
3175         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3176         specs = getattr(self, "schema", {}) if _specs is None else _specs
3177         self.specs = specs if specs.__class__ == dict else dict(specs)
3178         self._value = None if value is None else self._value_sanitize(value)
3179         if default is not None:
3180             default = self._value_sanitize(default)
3181             self.default = self.__class__(
3182                 value=default,
3183                 impl=self.tag,
3184                 expl=self._expl,
3185             )
3186             if value is None:
3187                 self._value = default
3188         self.defined = None
3189         tag_klass, _, tag_num = tag_decode(self.tag)
3190         self.tag_constructed = tag_encode(
3191             klass=tag_klass,
3192             form=TagFormConstructed,
3193             num=tag_num,
3194         )
3195
3196     def _bits2octets(self, bits):
3197         if len(self.specs) > 0:
3198             bits = bits.rstrip("0")
3199         bit_len = len(bits)
3200         bits += "0" * ((8 - (bit_len % 8)) % 8)
3201         octets = bytearray(len(bits) // 8)
3202         for i in six_xrange(len(octets)):
3203             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3204         return bit_len, bytes(octets)
3205
3206     def _value_sanitize(self, value):
3207         if isinstance(value, (string_types, binary_type)):
3208             if (
3209                     isinstance(value, string_types) and
3210                     value.startswith("'")
3211             ):
3212                 if value.endswith("'B"):
3213                     value = value[1:-2]
3214                     if not frozenset(value) <= SET01:
3215                         raise ValueError("B's coding contains unacceptable chars")
3216                     return self._bits2octets(value)
3217                 if value.endswith("'H"):
3218                     value = value[1:-2]
3219                     return (
3220                         len(value) * 4,
3221                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3222                     )
3223             if value.__class__ == binary_type:
3224                 return (len(value) * 8, value)
3225             raise InvalidValueType((self.__class__, string_types, binary_type))
3226         if value.__class__ == tuple:
3227             if (
3228                     len(value) == 2 and
3229                     isinstance(value[0], integer_types) and
3230                     value[1].__class__ == binary_type
3231             ):
3232                 return value
3233             bits = []
3234             for name in value:
3235                 bit = self.specs.get(name)
3236                 if bit is None:
3237                     raise ObjUnknown("BitString value: %s" % name)
3238                 bits.append(bit)
3239             if len(bits) == 0:
3240                 return self._bits2octets("")
3241             bits = frozenset(bits)
3242             return self._bits2octets("".join(
3243                 ("1" if bit in bits else "0")
3244                 for bit in six_xrange(max(bits) + 1)
3245             ))
3246         if issubclass(value.__class__, BitString):
3247             return value._value
3248         raise InvalidValueType((self.__class__, binary_type, string_types))
3249
3250     @property
3251     def ready(self):
3252         return self._value is not None
3253
3254     def __getstate__(self):
3255         return BitStringState(
3256             __version__,
3257             self.tag,
3258             self._tag_order,
3259             self._expl,
3260             self.default,
3261             self.optional,
3262             self.offset,
3263             self.llen,
3264             self.vlen,
3265             self.expl_lenindef,
3266             self.lenindef,
3267             self.ber_encoded,
3268             self.specs,
3269             self._value,
3270             self.tag_constructed,
3271             self.defined,
3272         )
3273
3274     def __setstate__(self, state):
3275         super(BitString, self).__setstate__(state)
3276         self.specs = state.specs
3277         self._value = state.value
3278         self.tag_constructed = state.tag_constructed
3279         self.defined = state.defined
3280
3281     def __iter__(self):
3282         self._assert_ready()
3283         for i in six_xrange(self._value[0]):
3284             yield self[i]
3285
3286     @property
3287     def bit_len(self):
3288         """Returns number of bits in the string
3289         """
3290         self._assert_ready()
3291         return self._value[0]
3292
3293     def __bytes__(self):
3294         self._assert_ready()
3295         return self._value[1]
3296
3297     def __eq__(self, their):
3298         if their.__class__ == bytes:
3299             return self._value[1] == their
3300         if not issubclass(their.__class__, BitString):
3301             return False
3302         return (
3303             self._value == their._value and
3304             self.tag == their.tag and
3305             self._expl == their._expl
3306         )
3307
3308     @property
3309     def named(self):
3310         """Named representation (if exists) of the bits
3311
3312         :returns: [str(name), ...]
3313         """
3314         return [name for name, bit in iteritems(self.specs) if self[bit]]
3315
3316     def __call__(
3317             self,
3318             value=None,
3319             impl=None,
3320             expl=None,
3321             default=None,
3322             optional=None,
3323     ):
3324         return self.__class__(
3325             value=value,
3326             impl=self.tag if impl is None else impl,
3327             expl=self._expl if expl is None else expl,
3328             default=self.default if default is None else default,
3329             optional=self.optional if optional is None else optional,
3330             _specs=self.specs,
3331         )
3332
3333     def __getitem__(self, key):
3334         if key.__class__ == int:
3335             bit_len, octets = self._value
3336             if key >= bit_len:
3337                 return False
3338             return (
3339                 byte2int(memoryview(octets)[key // 8:]) >>
3340                 (7 - (key % 8))
3341             ) & 1 == 1
3342         if isinstance(key, string_types):
3343             value = self.specs.get(key)
3344             if value is None:
3345                 raise ObjUnknown("BitString value: %s" % key)
3346             return self[value]
3347         raise InvalidValueType((int, str))
3348
3349     def _encode(self):
3350         self._assert_ready()
3351         bit_len, octets = self._value
3352         return b"".join((
3353             self.tag,
3354             len_encode(len(octets) + 1),
3355             int2byte((8 - bit_len % 8) % 8),
3356             octets,
3357         ))
3358
3359     def _encode1st(self, state):
3360         self._assert_ready()
3361         _, octets = self._value
3362         l = len(octets) + 1
3363         return len(self.tag) + len_size(l) + l, state
3364
3365     def _encode2nd(self, writer, state_iter):
3366         bit_len, octets = self._value
3367         write_full(writer, b"".join((
3368             self.tag,
3369             len_encode(len(octets) + 1),
3370             int2byte((8 - bit_len % 8) % 8),
3371         )))
3372         write_full(writer, octets)
3373
3374     def _encode_cer(self, writer):
3375         bit_len, octets = self._value
3376         if len(octets) + 1 <= 1000:
3377             write_full(writer, self._encode())
3378             return
3379         write_full(writer, self.tag_constructed)
3380         write_full(writer, LENINDEF)
3381         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3382             write_full(writer, b"".join((
3383                 BitString.tag_default,
3384                 LEN1K,
3385                 int2byte(0),
3386                 octets[offset:offset + 999],
3387             )))
3388         tail = octets[offset + 999:]
3389         if len(tail) > 0:
3390             tail = int2byte((8 - bit_len % 8) % 8) + tail
3391             write_full(writer, b"".join((
3392                 BitString.tag_default,
3393                 len_encode(len(tail)),
3394                 tail,
3395             )))
3396         write_full(writer, EOC)
3397
3398     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3399         try:
3400             t, tlen, lv = tag_strip(tlv)
3401         except DecodeError as err:
3402             raise err.__class__(
3403                 msg=err.msg,
3404                 klass=self.__class__,
3405                 decode_path=decode_path,
3406                 offset=offset,
3407             )
3408         if t == self.tag:
3409             if tag_only:  # pragma: no cover
3410                 yield None
3411                 return
3412             try:
3413                 l, llen, v = len_decode(lv)
3414             except DecodeError as err:
3415                 raise err.__class__(
3416                     msg=err.msg,
3417                     klass=self.__class__,
3418                     decode_path=decode_path,
3419                     offset=offset,
3420                 )
3421             if l > len(v):
3422                 raise NotEnoughData(
3423                     "encoded length is longer than data",
3424                     klass=self.__class__,
3425                     decode_path=decode_path,
3426                     offset=offset,
3427                 )
3428             if l == 0:
3429                 raise NotEnoughData(
3430                     "zero length",
3431                     klass=self.__class__,
3432                     decode_path=decode_path,
3433                     offset=offset,
3434                 )
3435             pad_size = byte2int(v)
3436             if l == 1 and pad_size != 0:
3437                 raise DecodeError(
3438                     "invalid empty value",
3439                     klass=self.__class__,
3440                     decode_path=decode_path,
3441                     offset=offset,
3442                 )
3443             if pad_size > 7:
3444                 raise DecodeError(
3445                     "too big pad",
3446                     klass=self.__class__,
3447                     decode_path=decode_path,
3448                     offset=offset,
3449                 )
3450             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3451                 raise DecodeError(
3452                     "invalid pad",
3453                     klass=self.__class__,
3454                     decode_path=decode_path,
3455                     offset=offset,
3456                 )
3457             v, tail = v[:l], v[l:]
3458             bit_len = (len(v) - 1) * 8 - pad_size
3459             obj = self.__class__(
3460                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3461                 impl=self.tag,
3462                 expl=self._expl,
3463                 default=self.default,
3464                 optional=self.optional,
3465                 _specs=self.specs,
3466                 _decoded=(offset, llen, l),
3467             )
3468             if evgen_mode:
3469                 obj._value = (bit_len, None)
3470             yield decode_path, obj, tail
3471             return
3472         if t != self.tag_constructed:
3473             raise TagMismatch(
3474                 klass=self.__class__,
3475                 decode_path=decode_path,
3476                 offset=offset,
3477             )
3478         if not ctx.get("bered", False):
3479             raise DecodeError(
3480                 "unallowed BER constructed encoding",
3481                 klass=self.__class__,
3482                 decode_path=decode_path,
3483                 offset=offset,
3484             )
3485         if tag_only:  # pragma: no cover
3486             yield None
3487             return
3488         lenindef = False
3489         try:
3490             l, llen, v = len_decode(lv)
3491         except LenIndefForm:
3492             llen, l, v = 1, 0, lv[1:]
3493             lenindef = True
3494         except DecodeError as err:
3495             raise err.__class__(
3496                 msg=err.msg,
3497                 klass=self.__class__,
3498                 decode_path=decode_path,
3499                 offset=offset,
3500             )
3501         if l > len(v):
3502             raise NotEnoughData(
3503                 "encoded length is longer than data",
3504                 klass=self.__class__,
3505                 decode_path=decode_path,
3506                 offset=offset,
3507             )
3508         if not lenindef and l == 0:
3509             raise NotEnoughData(
3510                 "zero length",
3511                 klass=self.__class__,
3512                 decode_path=decode_path,
3513                 offset=offset,
3514             )
3515         chunks = []
3516         sub_offset = offset + tlen + llen
3517         vlen = 0
3518         while True:
3519             if lenindef:
3520                 if v[:EOC_LEN].tobytes() == EOC:
3521                     break
3522             else:
3523                 if vlen == l:
3524                     break
3525                 if vlen > l:
3526                     raise DecodeError(
3527                         "chunk out of bounds",
3528                         klass=self.__class__,
3529                         decode_path=decode_path + (str(len(chunks) - 1),),
3530                         offset=chunks[-1].offset,
3531                     )
3532             sub_decode_path = decode_path + (str(len(chunks)),)
3533             try:
3534                 if evgen_mode:
3535                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3536                             v,
3537                             offset=sub_offset,
3538                             decode_path=sub_decode_path,
3539                             leavemm=True,
3540                             ctx=ctx,
3541                             _ctx_immutable=False,
3542                     ):
3543                         yield _decode_path, chunk, v_tail
3544                 else:
3545                     _, chunk, v_tail = next(BitString().decode_evgen(
3546                         v,
3547                         offset=sub_offset,
3548                         decode_path=sub_decode_path,
3549                         leavemm=True,
3550                         ctx=ctx,
3551                         _ctx_immutable=False,
3552                         _evgen_mode=False,
3553                     ))
3554             except TagMismatch:
3555                 raise DecodeError(
3556                     "expected BitString encoded chunk",
3557                     klass=self.__class__,
3558                     decode_path=sub_decode_path,
3559                     offset=sub_offset,
3560                 )
3561             chunks.append(chunk)
3562             sub_offset += chunk.tlvlen
3563             vlen += chunk.tlvlen
3564             v = v_tail
3565         if len(chunks) == 0:
3566             raise DecodeError(
3567                 "no chunks",
3568                 klass=self.__class__,
3569                 decode_path=decode_path,
3570                 offset=offset,
3571             )
3572         values = []
3573         bit_len = 0
3574         for chunk_i, chunk in enumerate(chunks[:-1]):
3575             if chunk.bit_len % 8 != 0:
3576                 raise DecodeError(
3577                     "BitString chunk is not multiple of 8 bits",
3578                     klass=self.__class__,
3579                     decode_path=decode_path + (str(chunk_i),),
3580                     offset=chunk.offset,
3581                 )
3582             if not evgen_mode:
3583                 values.append(bytes(chunk))
3584             bit_len += chunk.bit_len
3585         chunk_last = chunks[-1]
3586         if not evgen_mode:
3587             values.append(bytes(chunk_last))
3588         bit_len += chunk_last.bit_len
3589         obj = self.__class__(
3590             value=None if evgen_mode else (bit_len, b"".join(values)),
3591             impl=self.tag,
3592             expl=self._expl,
3593             default=self.default,
3594             optional=self.optional,
3595             _specs=self.specs,
3596             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3597         )
3598         if evgen_mode:
3599             obj._value = (bit_len, None)
3600         obj.lenindef = lenindef
3601         obj.ber_encoded = True
3602         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3603
3604     def __repr__(self):
3605         return pp_console_row(next(self.pps()))
3606
3607     def pps(self, decode_path=()):
3608         value = None
3609         blob = None
3610         if self.ready:
3611             bit_len, blob = self._value
3612             value = "%d bits" % bit_len
3613             if len(self.specs) > 0 and blob is not None:
3614                 blob = tuple(self.named)
3615         yield _pp(
3616             obj=self,
3617             asn1_type_name=self.asn1_type_name,
3618             obj_name=self.__class__.__name__,
3619             decode_path=decode_path,
3620             value=value,
3621             blob=blob,
3622             optional=self.optional,
3623             default=self == self.default,
3624             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3625             expl=None if self._expl is None else tag_decode(self._expl),
3626             offset=self.offset,
3627             tlen=self.tlen,
3628             llen=self.llen,
3629             vlen=self.vlen,
3630             expl_offset=self.expl_offset if self.expled else None,
3631             expl_tlen=self.expl_tlen if self.expled else None,
3632             expl_llen=self.expl_llen if self.expled else None,
3633             expl_vlen=self.expl_vlen if self.expled else None,
3634             expl_lenindef=self.expl_lenindef,
3635             lenindef=self.lenindef,
3636             ber_encoded=self.ber_encoded,
3637             bered=self.bered,
3638         )
3639         defined_by, defined = self.defined or (None, None)
3640         if defined_by is not None:
3641             yield defined.pps(
3642                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3643             )
3644         for pp in self.pps_lenindef(decode_path):
3645             yield pp
3646
3647
3648 OctetStringState = namedtuple(
3649     "OctetStringState",
3650     BasicState._fields + (
3651         "value",
3652         "bound_min",
3653         "bound_max",
3654         "tag_constructed",
3655         "defined",
3656     ),
3657     **NAMEDTUPLE_KWARGS
3658 )
3659
3660
3661 class OctetString(Obj):
3662     """``OCTET STRING`` binary string type
3663
3664     >>> s = OctetString(b"hello world")
3665     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3666     >>> s == OctetString(b"hello world")
3667     True
3668     >>> bytes(s)
3669     b'hello world'
3670
3671     >>> OctetString(b"hello", bounds=(4, 4))
3672     Traceback (most recent call last):
3673     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3674     >>> OctetString(b"hell", bounds=(4, 4))
3675     OCTET STRING 4 bytes 68656c6c
3676
3677     Memoryviews can be used as a values. If memoryview is made on
3678     mmap-ed file, then it does not take storage inside OctetString
3679     itself. In CER encoding mode it will be streamed to the specified
3680     writer, copying 1 KB chunks.
3681     """
3682     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3683     tag_default = tag_encode(4)
3684     asn1_type_name = "OCTET STRING"
3685     evgen_mode_skip_value = True
3686
3687     def __init__(
3688             self,
3689             value=None,
3690             bounds=None,
3691             impl=None,
3692             expl=None,
3693             default=None,
3694             optional=False,
3695             _decoded=(0, 0, 0),
3696             ctx=None,
3697     ):
3698         """
3699         :param value: set the value. Either binary type, or
3700                       :py:class:`pyderasn.OctetString` object
3701         :param bounds: set ``(MIN, MAX)`` value size constraint.
3702                        (-inf, +inf) by default
3703         :param bytes impl: override default tag with ``IMPLICIT`` one
3704         :param bytes expl: override default tag with ``EXPLICIT`` one
3705         :param default: set default value. Type same as in ``value``
3706         :param bool optional: is object ``OPTIONAL`` in sequence
3707         """
3708         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3709         self._value = value
3710         self._bound_min, self._bound_max = getattr(
3711             self,
3712             "bounds",
3713             (0, float("+inf")),
3714         ) if bounds is None else bounds
3715         if value is not None:
3716             self._value = self._value_sanitize(value)
3717         if default is not None:
3718             default = self._value_sanitize(default)
3719             self.default = self.__class__(
3720                 value=default,
3721                 impl=self.tag,
3722                 expl=self._expl,
3723             )
3724             if self._value is None:
3725                 self._value = default
3726         self.defined = None
3727         tag_klass, _, tag_num = tag_decode(self.tag)
3728         self.tag_constructed = tag_encode(
3729             klass=tag_klass,
3730             form=TagFormConstructed,
3731             num=tag_num,
3732         )
3733
3734     def _value_sanitize(self, value):
3735         if value.__class__ == binary_type or value.__class__ == memoryview:
3736             pass
3737         elif issubclass(value.__class__, OctetString):
3738             value = value._value
3739         else:
3740             raise InvalidValueType((self.__class__, bytes, memoryview))
3741         if not self._bound_min <= len(value) <= self._bound_max:
3742             raise BoundsError(self._bound_min, len(value), self._bound_max)
3743         return value
3744
3745     @property
3746     def ready(self):
3747         return self._value is not None
3748
3749     def __getstate__(self):
3750         return OctetStringState(
3751             __version__,
3752             self.tag,
3753             self._tag_order,
3754             self._expl,
3755             self.default,
3756             self.optional,
3757             self.offset,
3758             self.llen,
3759             self.vlen,
3760             self.expl_lenindef,
3761             self.lenindef,
3762             self.ber_encoded,
3763             self._value,
3764             self._bound_min,
3765             self._bound_max,
3766             self.tag_constructed,
3767             self.defined,
3768         )
3769
3770     def __setstate__(self, state):
3771         super(OctetString, self).__setstate__(state)
3772         self._value = state.value
3773         self._bound_min = state.bound_min
3774         self._bound_max = state.bound_max
3775         self.tag_constructed = state.tag_constructed
3776         self.defined = state.defined
3777
3778     def __bytes__(self):
3779         self._assert_ready()
3780         return bytes(self._value)
3781
3782     def __eq__(self, their):
3783         if their.__class__ == binary_type:
3784             return self._value == their
3785         if not issubclass(their.__class__, OctetString):
3786             return False
3787         return (
3788             self._value == their._value and
3789             self.tag == their.tag and
3790             self._expl == their._expl
3791         )
3792
3793     def __lt__(self, their):
3794         return self._value < their._value
3795
3796     def __call__(
3797             self,
3798             value=None,
3799             bounds=None,
3800             impl=None,
3801             expl=None,
3802             default=None,
3803             optional=None,
3804     ):
3805         return self.__class__(
3806             value=value,
3807             bounds=(
3808                 (self._bound_min, self._bound_max)
3809                 if bounds is None else bounds
3810             ),
3811             impl=self.tag if impl is None else impl,
3812             expl=self._expl if expl is None else expl,
3813             default=self.default if default is None else default,
3814             optional=self.optional if optional is None else optional,
3815         )
3816
3817     def _encode(self):
3818         self._assert_ready()
3819         return b"".join((
3820             self.tag,
3821             len_encode(len(self._value)),
3822             self._value,
3823         ))
3824
3825     def _encode1st(self, state):
3826         self._assert_ready()
3827         l = len(self._value)
3828         return len(self.tag) + len_size(l) + l, state
3829
3830     def _encode2nd(self, writer, state_iter):
3831         value = self._value
3832         write_full(writer, self.tag + len_encode(len(value)))
3833         write_full(writer, value)
3834
3835     def _encode_cer(self, writer):
3836         octets = self._value
3837         if len(octets) <= 1000:
3838             write_full(writer, self._encode())
3839             return
3840         write_full(writer, self.tag_constructed)
3841         write_full(writer, LENINDEF)
3842         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3843             write_full(writer, b"".join((
3844                 OctetString.tag_default,
3845                 LEN1K,
3846                 octets[offset:offset + 1000],
3847             )))
3848         tail = octets[offset + 1000:]
3849         if len(tail) > 0:
3850             write_full(writer, b"".join((
3851                 OctetString.tag_default,
3852                 len_encode(len(tail)),
3853                 tail,
3854             )))
3855         write_full(writer, EOC)
3856
3857     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3858         try:
3859             t, tlen, lv = tag_strip(tlv)
3860         except DecodeError as err:
3861             raise err.__class__(
3862                 msg=err.msg,
3863                 klass=self.__class__,
3864                 decode_path=decode_path,
3865                 offset=offset,
3866             )
3867         if t == self.tag:
3868             if tag_only:
3869                 yield None
3870                 return
3871             try:
3872                 l, llen, v = len_decode(lv)
3873             except DecodeError as err:
3874                 raise err.__class__(
3875                     msg=err.msg,
3876                     klass=self.__class__,
3877                     decode_path=decode_path,
3878                     offset=offset,
3879                 )
3880             if l > len(v):
3881                 raise NotEnoughData(
3882                     "encoded length is longer than data",
3883                     klass=self.__class__,
3884                     decode_path=decode_path,
3885                     offset=offset,
3886                 )
3887             v, tail = v[:l], v[l:]
3888             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3889                 raise DecodeError(
3890                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3891                     klass=self.__class__,
3892                     decode_path=decode_path,
3893                     offset=offset,
3894                 )
3895             try:
3896                 obj = self.__class__(
3897                     value=(
3898                         None if (evgen_mode and self.evgen_mode_skip_value)
3899                         else v.tobytes()
3900                     ),
3901                     bounds=(self._bound_min, self._bound_max),
3902                     impl=self.tag,
3903                     expl=self._expl,
3904                     default=self.default,
3905                     optional=self.optional,
3906                     _decoded=(offset, llen, l),
3907                     ctx=ctx,
3908                 )
3909             except DecodeError as err:
3910                 raise DecodeError(
3911                     msg=err.msg,
3912                     klass=self.__class__,
3913                     decode_path=decode_path,
3914                     offset=offset,
3915                 )
3916             except BoundsError as err:
3917                 raise DecodeError(
3918                     msg=str(err),
3919                     klass=self.__class__,
3920                     decode_path=decode_path,
3921                     offset=offset,
3922                 )
3923             yield decode_path, obj, tail
3924             return
3925         if t != self.tag_constructed:
3926             raise TagMismatch(
3927                 klass=self.__class__,
3928                 decode_path=decode_path,
3929                 offset=offset,
3930             )
3931         if not ctx.get("bered", False):
3932             raise DecodeError(
3933                 "unallowed BER constructed encoding",
3934                 klass=self.__class__,
3935                 decode_path=decode_path,
3936                 offset=offset,
3937             )
3938         if tag_only:
3939             yield None
3940             return
3941         lenindef = False
3942         try:
3943             l, llen, v = len_decode(lv)
3944         except LenIndefForm:
3945             llen, l, v = 1, 0, lv[1:]
3946             lenindef = True
3947         except DecodeError as err:
3948             raise err.__class__(
3949                 msg=err.msg,
3950                 klass=self.__class__,
3951                 decode_path=decode_path,
3952                 offset=offset,
3953             )
3954         if l > len(v):
3955             raise NotEnoughData(
3956                 "encoded length is longer than data",
3957                 klass=self.__class__,
3958                 decode_path=decode_path,
3959                 offset=offset,
3960             )
3961         chunks = []
3962         chunks_count = 0
3963         sub_offset = offset + tlen + llen
3964         vlen = 0
3965         payload_len = 0
3966         while True:
3967             if lenindef:
3968                 if v[:EOC_LEN].tobytes() == EOC:
3969                     break
3970             else:
3971                 if vlen == l:
3972                     break
3973                 if vlen > l:
3974                     raise DecodeError(
3975                         "chunk out of bounds",
3976                         klass=self.__class__,
3977                         decode_path=decode_path + (str(len(chunks) - 1),),
3978                         offset=chunks[-1].offset,
3979                     )
3980             try:
3981                 if evgen_mode:
3982                     sub_decode_path = decode_path + (str(chunks_count),)
3983                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3984                             v,
3985                             offset=sub_offset,
3986                             decode_path=sub_decode_path,
3987                             leavemm=True,
3988                             ctx=ctx,
3989                             _ctx_immutable=False,
3990                     ):
3991                         yield _decode_path, chunk, v_tail
3992                         if not chunk.ber_encoded:
3993                             payload_len += chunk.vlen
3994                     chunks_count += 1
3995                 else:
3996                     sub_decode_path = decode_path + (str(len(chunks)),)
3997                     _, chunk, v_tail = next(OctetString().decode_evgen(
3998                         v,
3999                         offset=sub_offset,
4000                         decode_path=sub_decode_path,
4001                         leavemm=True,
4002                         ctx=ctx,
4003                         _ctx_immutable=False,
4004                         _evgen_mode=False,
4005                     ))
4006                     chunks.append(chunk)
4007             except TagMismatch:
4008                 raise DecodeError(
4009                     "expected OctetString encoded chunk",
4010                     klass=self.__class__,
4011                     decode_path=sub_decode_path,
4012                     offset=sub_offset,
4013                 )
4014             sub_offset += chunk.tlvlen
4015             vlen += chunk.tlvlen
4016             v = v_tail
4017         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4018             raise DecodeError(
4019                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4020                 klass=self.__class__,
4021                 decode_path=decode_path,
4022                 offset=offset,
4023             )
4024         try:
4025             obj = self.__class__(
4026                 value=(
4027                     None if evgen_mode else
4028                     b"".join(bytes(chunk) for chunk in chunks)
4029                 ),
4030                 bounds=(self._bound_min, self._bound_max),
4031                 impl=self.tag,
4032                 expl=self._expl,
4033                 default=self.default,
4034                 optional=self.optional,
4035                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4036                 ctx=ctx,
4037             )
4038         except DecodeError as err:
4039             raise DecodeError(
4040                 msg=err.msg,
4041                 klass=self.__class__,
4042                 decode_path=decode_path,
4043                 offset=offset,
4044             )
4045         except BoundsError as err:
4046             raise DecodeError(
4047                 msg=str(err),
4048                 klass=self.__class__,
4049                 decode_path=decode_path,
4050                 offset=offset,
4051             )
4052         obj.lenindef = lenindef
4053         obj.ber_encoded = True
4054         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4055
4056     def __repr__(self):
4057         return pp_console_row(next(self.pps()))
4058
4059     def pps(self, decode_path=()):
4060         yield _pp(
4061             obj=self,
4062             asn1_type_name=self.asn1_type_name,
4063             obj_name=self.__class__.__name__,
4064             decode_path=decode_path,
4065             value=("%d bytes" % len(self._value)) if self.ready else None,
4066             blob=self._value if self.ready else None,
4067             optional=self.optional,
4068             default=self == self.default,
4069             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4070             expl=None if self._expl is None else tag_decode(self._expl),
4071             offset=self.offset,
4072             tlen=self.tlen,
4073             llen=self.llen,
4074             vlen=self.vlen,
4075             expl_offset=self.expl_offset if self.expled else None,
4076             expl_tlen=self.expl_tlen if self.expled else None,
4077             expl_llen=self.expl_llen if self.expled else None,
4078             expl_vlen=self.expl_vlen if self.expled else None,
4079             expl_lenindef=self.expl_lenindef,
4080             lenindef=self.lenindef,
4081             ber_encoded=self.ber_encoded,
4082             bered=self.bered,
4083         )
4084         defined_by, defined = self.defined or (None, None)
4085         if defined_by is not None:
4086             yield defined.pps(
4087                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4088             )
4089         for pp in self.pps_lenindef(decode_path):
4090             yield pp
4091
4092
4093 def agg_octet_string(evgens, decode_path, raw, writer):
4094     """Aggregate constructed string (OctetString and its derivatives)
4095
4096     :param evgens: iterator of generated events
4097     :param decode_path: points to the string we want to decode
4098     :param raw: slicebable (memoryview, bytearray, etc) with
4099                 the data evgens are generated on
4100     :param writer: buffer.write where string is going to be saved
4101     :param writer: where string is going to be saved. Must comply
4102                    with ``io.RawIOBase.write`` behaviour
4103
4104     .. seealso:: :ref:`agg_octet_string`
4105     """
4106     decode_path_len = len(decode_path)
4107     for dp, obj, _ in evgens:
4108         if dp[:decode_path_len] != decode_path:
4109             continue
4110         if not obj.ber_encoded:
4111             write_full(writer, raw[
4112                 obj.offset + obj.tlen + obj.llen:
4113                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4114                 (EOC_LEN if obj.expl_lenindef else 0)
4115             ])
4116         if len(dp) == decode_path_len:
4117             break
4118
4119
4120 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4121
4122
4123 class Null(Obj):
4124     """``NULL`` null object
4125
4126     >>> n = Null()
4127     NULL
4128     >>> n.ready
4129     True
4130     """
4131     __slots__ = ()
4132     tag_default = tag_encode(5)
4133     asn1_type_name = "NULL"
4134
4135     def __init__(
4136             self,
4137             value=None,  # unused, but Sequence passes it
4138             impl=None,
4139             expl=None,
4140             optional=False,
4141             _decoded=(0, 0, 0),
4142     ):
4143         """
4144         :param bytes impl: override default tag with ``IMPLICIT`` one
4145         :param bytes expl: override default tag with ``EXPLICIT`` one
4146         :param bool optional: is object ``OPTIONAL`` in sequence
4147         """
4148         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4149         self.default = None
4150
4151     @property
4152     def ready(self):
4153         return True
4154
4155     def __getstate__(self):
4156         return NullState(
4157             __version__,
4158             self.tag,
4159             self._tag_order,
4160             self._expl,
4161             self.default,
4162             self.optional,
4163             self.offset,
4164             self.llen,
4165             self.vlen,
4166             self.expl_lenindef,
4167             self.lenindef,
4168             self.ber_encoded,
4169         )
4170
4171     def __eq__(self, their):
4172         if not issubclass(their.__class__, Null):
4173             return False
4174         return (
4175             self.tag == their.tag and
4176             self._expl == their._expl
4177         )
4178
4179     def __call__(
4180             self,
4181             value=None,
4182             impl=None,
4183             expl=None,
4184             optional=None,
4185     ):
4186         return self.__class__(
4187             impl=self.tag if impl is None else impl,
4188             expl=self._expl if expl is None else expl,
4189             optional=self.optional if optional is None else optional,
4190         )
4191
4192     def _encode(self):
4193         return self.tag + LEN0
4194
4195     def _encode1st(self, state):
4196         return len(self.tag) + 1, state
4197
4198     def _encode2nd(self, writer, state_iter):
4199         write_full(writer, self.tag + LEN0)
4200
4201     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4202         try:
4203             t, _, lv = tag_strip(tlv)
4204         except DecodeError as err:
4205             raise err.__class__(
4206                 msg=err.msg,
4207                 klass=self.__class__,
4208                 decode_path=decode_path,
4209                 offset=offset,
4210             )
4211         if t != self.tag:
4212             raise TagMismatch(
4213                 klass=self.__class__,
4214                 decode_path=decode_path,
4215                 offset=offset,
4216             )
4217         if tag_only:  # pragma: no cover
4218             yield None
4219             return
4220         try:
4221             l, _, v = len_decode(lv)
4222         except DecodeError as err:
4223             raise err.__class__(
4224                 msg=err.msg,
4225                 klass=self.__class__,
4226                 decode_path=decode_path,
4227                 offset=offset,
4228             )
4229         if l != 0:
4230             raise InvalidLength(
4231                 "Null must have zero length",
4232                 klass=self.__class__,
4233                 decode_path=decode_path,
4234                 offset=offset,
4235             )
4236         obj = self.__class__(
4237             impl=self.tag,
4238             expl=self._expl,
4239             optional=self.optional,
4240             _decoded=(offset, 1, 0),
4241         )
4242         yield decode_path, obj, v
4243
4244     def __repr__(self):
4245         return pp_console_row(next(self.pps()))
4246
4247     def pps(self, decode_path=()):
4248         yield _pp(
4249             obj=self,
4250             asn1_type_name=self.asn1_type_name,
4251             obj_name=self.__class__.__name__,
4252             decode_path=decode_path,
4253             optional=self.optional,
4254             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4255             expl=None if self._expl is None else tag_decode(self._expl),
4256             offset=self.offset,
4257             tlen=self.tlen,
4258             llen=self.llen,
4259             vlen=self.vlen,
4260             expl_offset=self.expl_offset if self.expled else None,
4261             expl_tlen=self.expl_tlen if self.expled else None,
4262             expl_llen=self.expl_llen if self.expled else None,
4263             expl_vlen=self.expl_vlen if self.expled else None,
4264             expl_lenindef=self.expl_lenindef,
4265             bered=self.bered,
4266         )
4267         for pp in self.pps_lenindef(decode_path):
4268             yield pp
4269
4270
4271 ObjectIdentifierState = namedtuple(
4272     "ObjectIdentifierState",
4273     BasicState._fields + ("value", "defines"),
4274     **NAMEDTUPLE_KWARGS
4275 )
4276
4277
4278 class ObjectIdentifier(Obj):
4279     """``OBJECT IDENTIFIER`` OID type
4280
4281     >>> oid = ObjectIdentifier((1, 2, 3))
4282     OBJECT IDENTIFIER 1.2.3
4283     >>> oid == ObjectIdentifier("1.2.3")
4284     True
4285     >>> tuple(oid)
4286     (1, 2, 3)
4287     >>> str(oid)
4288     '1.2.3'
4289     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4290     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4291
4292     >>> str(ObjectIdentifier((3, 1)))
4293     Traceback (most recent call last):
4294     pyderasn.InvalidOID: unacceptable first arc value
4295     """
4296     __slots__ = ("defines",)
4297     tag_default = tag_encode(6)
4298     asn1_type_name = "OBJECT IDENTIFIER"
4299
4300     def __init__(
4301             self,
4302             value=None,
4303             defines=(),
4304             impl=None,
4305             expl=None,
4306             default=None,
4307             optional=False,
4308             _decoded=(0, 0, 0),
4309     ):
4310         """
4311         :param value: set the value. Either tuples of integers,
4312                       string of "."-concatenated integers, or
4313                       :py:class:`pyderasn.ObjectIdentifier` object
4314         :param defines: sequence of tuples. Each tuple has two elements.
4315                         First one is relative to current one decode
4316                         path, aiming to the field defined by that OID.
4317                         Read about relative path in
4318                         :py:func:`pyderasn.abs_decode_path`. Second
4319                         tuple element is ``{OID: pyderasn.Obj()}``
4320                         dictionary, mapping between current OID value
4321                         and structure applied to defined field.
4322
4323                         .. seealso:: :ref:`definedby`
4324
4325         :param bytes impl: override default tag with ``IMPLICIT`` one
4326         :param bytes expl: override default tag with ``EXPLICIT`` one
4327         :param default: set default value. Type same as in ``value``
4328         :param bool optional: is object ``OPTIONAL`` in sequence
4329         """
4330         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4331         self._value = value
4332         if value is not None:
4333             self._value = self._value_sanitize(value)
4334         if default is not None:
4335             default = self._value_sanitize(default)
4336             self.default = self.__class__(
4337                 value=default,
4338                 impl=self.tag,
4339                 expl=self._expl,
4340             )
4341             if self._value is None:
4342                 self._value = default
4343         self.defines = defines
4344
4345     def __add__(self, their):
4346         if their.__class__ == tuple:
4347             return self.__class__(self._value + array("L", their))
4348         if isinstance(their, self.__class__):
4349             return self.__class__(self._value + their._value)
4350         raise InvalidValueType((self.__class__, tuple))
4351
4352     def _value_sanitize(self, value):
4353         if issubclass(value.__class__, ObjectIdentifier):
4354             return value._value
4355         if isinstance(value, string_types):
4356             try:
4357                 value = array("L", (pureint(arc) for arc in value.split(".")))
4358             except ValueError:
4359                 raise InvalidOID("unacceptable arcs values")
4360         if value.__class__ == tuple:
4361             try:
4362                 value = array("L", value)
4363             except OverflowError as err:
4364                 raise InvalidOID(repr(err))
4365         if value.__class__ is array:
4366             if len(value) < 2:
4367                 raise InvalidOID("less than 2 arcs")
4368             first_arc = value[0]
4369             if first_arc in (0, 1):
4370                 if not (0 <= value[1] <= 39):
4371                     raise InvalidOID("second arc is too wide")
4372             elif first_arc == 2:
4373                 pass
4374             else:
4375                 raise InvalidOID("unacceptable first arc value")
4376             if not all(arc >= 0 for arc in value):
4377                 raise InvalidOID("negative arc value")
4378             return value
4379         raise InvalidValueType((self.__class__, str, tuple))
4380
4381     @property
4382     def ready(self):
4383         return self._value is not None
4384
4385     def __getstate__(self):
4386         return ObjectIdentifierState(
4387             __version__,
4388             self.tag,
4389             self._tag_order,
4390             self._expl,
4391             self.default,
4392             self.optional,
4393             self.offset,
4394             self.llen,
4395             self.vlen,
4396             self.expl_lenindef,
4397             self.lenindef,
4398             self.ber_encoded,
4399             self._value,
4400             self.defines,
4401         )
4402
4403     def __setstate__(self, state):
4404         super(ObjectIdentifier, self).__setstate__(state)
4405         self._value = state.value
4406         self.defines = state.defines
4407
4408     def __iter__(self):
4409         self._assert_ready()
4410         return iter(self._value)
4411
4412     def __str__(self):
4413         return ".".join(str(arc) for arc in self._value or ())
4414
4415     def __hash__(self):
4416         self._assert_ready()
4417         return hash(b"".join((
4418             self.tag,
4419             bytes(self._expl or b""),
4420             str(self._value).encode("ascii"),
4421         )))
4422
4423     def __eq__(self, their):
4424         if their.__class__ == tuple:
4425             return self._value == array("L", their)
4426         if not issubclass(their.__class__, ObjectIdentifier):
4427             return False
4428         return (
4429             self.tag == their.tag and
4430             self._expl == their._expl and
4431             self._value == their._value
4432         )
4433
4434     def __lt__(self, their):
4435         return self._value < their._value
4436
4437     def __call__(
4438             self,
4439             value=None,
4440             defines=None,
4441             impl=None,
4442             expl=None,
4443             default=None,
4444             optional=None,
4445     ):
4446         return self.__class__(
4447             value=value,
4448             defines=self.defines if defines is None else defines,
4449             impl=self.tag if impl is None else impl,
4450             expl=self._expl if expl is None else expl,
4451             default=self.default if default is None else default,
4452             optional=self.optional if optional is None else optional,
4453         )
4454
4455     def _encode_octets(self):
4456         self._assert_ready()
4457         value = self._value
4458         first_value = value[1]
4459         first_arc = value[0]
4460         if first_arc == 0:
4461             pass
4462         elif first_arc == 1:
4463             first_value += 40
4464         elif first_arc == 2:
4465             first_value += 80
4466         else:  # pragma: no cover
4467             raise RuntimeError("invalid arc is stored")
4468         octets = [zero_ended_encode(first_value)]
4469         for arc in value[2:]:
4470             octets.append(zero_ended_encode(arc))
4471         return b"".join(octets)
4472
4473     def _encode(self):
4474         v = self._encode_octets()
4475         return b"".join((self.tag, len_encode(len(v)), v))
4476
4477     def _encode1st(self, state):
4478         l = len(self._encode_octets())
4479         return len(self.tag) + len_size(l) + l, state
4480
4481     def _encode2nd(self, writer, state_iter):
4482         write_full(writer, self._encode())
4483
4484     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4485         try:
4486             t, _, lv = tag_strip(tlv)
4487         except DecodeError as err:
4488             raise err.__class__(
4489                 msg=err.msg,
4490                 klass=self.__class__,
4491                 decode_path=decode_path,
4492                 offset=offset,
4493             )
4494         if t != self.tag:
4495             raise TagMismatch(
4496                 klass=self.__class__,
4497                 decode_path=decode_path,
4498                 offset=offset,
4499             )
4500         if tag_only:  # pragma: no cover
4501             yield None
4502             return
4503         try:
4504             l, llen, v = len_decode(lv)
4505         except DecodeError as err:
4506             raise err.__class__(
4507                 msg=err.msg,
4508                 klass=self.__class__,
4509                 decode_path=decode_path,
4510                 offset=offset,
4511             )
4512         if l > len(v):
4513             raise NotEnoughData(
4514                 "encoded length is longer than data",
4515                 klass=self.__class__,
4516                 decode_path=decode_path,
4517                 offset=offset,
4518             )
4519         if l == 0:
4520             raise NotEnoughData(
4521                 "zero length",
4522                 klass=self.__class__,
4523                 decode_path=decode_path,
4524                 offset=offset,
4525             )
4526         v, tail = v[:l], v[l:]
4527         arcs = array("L")
4528         ber_encoded = False
4529         while len(v) > 0:
4530             i = 0
4531             arc = 0
4532             while True:
4533                 octet = indexbytes(v, i)
4534                 if i == 0 and octet == 0x80:
4535                     if ctx.get("bered", False):
4536                         ber_encoded = True
4537                     else:
4538                         raise DecodeError(
4539                             "non normalized arc encoding",
4540                             klass=self.__class__,
4541                             decode_path=decode_path,
4542                             offset=offset,
4543                         )
4544                 arc = (arc << 7) | (octet & 0x7F)
4545                 if octet & 0x80 == 0:
4546                     try:
4547                         arcs.append(arc)
4548                     except OverflowError:
4549                         raise DecodeError(
4550                             "too huge value for local unsigned long",
4551                             klass=self.__class__,
4552                             decode_path=decode_path,
4553                             offset=offset,
4554                         )
4555                     v = v[i + 1:]
4556                     break
4557                 i += 1
4558                 if i == len(v):
4559                     raise DecodeError(
4560                         "unfinished OID",
4561                         klass=self.__class__,
4562                         decode_path=decode_path,
4563                         offset=offset,
4564                     )
4565         first_arc = 0
4566         second_arc = arcs[0]
4567         if 0 <= second_arc <= 39:
4568             first_arc = 0
4569         elif 40 <= second_arc <= 79:
4570             first_arc = 1
4571             second_arc -= 40
4572         else:
4573             first_arc = 2
4574             second_arc -= 80
4575         obj = self.__class__(
4576             value=array("L", (first_arc, second_arc)) + arcs[1:],
4577             impl=self.tag,
4578             expl=self._expl,
4579             default=self.default,
4580             optional=self.optional,
4581             _decoded=(offset, llen, l),
4582         )
4583         if ber_encoded:
4584             obj.ber_encoded = True
4585         yield decode_path, obj, tail
4586
4587     def __repr__(self):
4588         return pp_console_row(next(self.pps()))
4589
4590     def pps(self, decode_path=()):
4591         yield _pp(
4592             obj=self,
4593             asn1_type_name=self.asn1_type_name,
4594             obj_name=self.__class__.__name__,
4595             decode_path=decode_path,
4596             value=str(self) if self.ready else None,
4597             optional=self.optional,
4598             default=self == self.default,
4599             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4600             expl=None if self._expl is None else tag_decode(self._expl),
4601             offset=self.offset,
4602             tlen=self.tlen,
4603             llen=self.llen,
4604             vlen=self.vlen,
4605             expl_offset=self.expl_offset if self.expled else None,
4606             expl_tlen=self.expl_tlen if self.expled else None,
4607             expl_llen=self.expl_llen if self.expled else None,
4608             expl_vlen=self.expl_vlen if self.expled else None,
4609             expl_lenindef=self.expl_lenindef,
4610             ber_encoded=self.ber_encoded,
4611             bered=self.bered,
4612         )
4613         for pp in self.pps_lenindef(decode_path):
4614             yield pp
4615
4616
4617 class Enumerated(Integer):
4618     """``ENUMERATED`` integer type
4619
4620     This type is identical to :py:class:`pyderasn.Integer`, but requires
4621     schema to be specified and does not accept values missing from it.
4622     """
4623     __slots__ = ()
4624     tag_default = tag_encode(10)
4625     asn1_type_name = "ENUMERATED"
4626
4627     def __init__(
4628             self,
4629             value=None,
4630             impl=None,
4631             expl=None,
4632             default=None,
4633             optional=False,
4634             _specs=None,
4635             _decoded=(0, 0, 0),
4636             bounds=None,  # dummy argument, workability for Integer.decode
4637     ):
4638         super(Enumerated, self).__init__(
4639             value, bounds, impl, expl, default, optional, _specs, _decoded,
4640         )
4641         if len(self.specs) == 0:
4642             raise ValueError("schema must be specified")
4643
4644     def _value_sanitize(self, value):
4645         if isinstance(value, self.__class__):
4646             value = value._value
4647         elif isinstance(value, integer_types):
4648             for _value in itervalues(self.specs):
4649                 if _value == value:
4650                     break
4651             else:
4652                 raise DecodeError(
4653                     "unknown integer value: %s" % value,
4654                     klass=self.__class__,
4655                 )
4656         elif isinstance(value, string_types):
4657             value = self.specs.get(value)
4658             if value is None:
4659                 raise ObjUnknown("integer value: %s" % value)
4660         else:
4661             raise InvalidValueType((self.__class__, int, str))
4662         return value
4663
4664     def __call__(
4665             self,
4666             value=None,
4667             impl=None,
4668             expl=None,
4669             default=None,
4670             optional=None,
4671             _specs=None,
4672     ):
4673         return self.__class__(
4674             value=value,
4675             impl=self.tag if impl is None else impl,
4676             expl=self._expl if expl is None else expl,
4677             default=self.default if default is None else default,
4678             optional=self.optional if optional is None else optional,
4679             _specs=self.specs,
4680         )
4681
4682
4683 def escape_control_unicode(c):
4684     if unicat(c)[0] == "C":
4685         c = repr(c).lstrip("u").strip("'")
4686     return c
4687
4688
4689 class CommonString(OctetString):
4690     """Common class for all strings
4691
4692     Everything resembles :py:class:`pyderasn.OctetString`, except
4693     ability to deal with unicode text strings.
4694
4695     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4696     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4697     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4698     True
4699     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4700     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4701     >>> str(s)
4702     'привет Ð¼Ð¸Ñ€'
4703     >>> hexenc(bytes(s))
4704     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4705
4706     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4707     Traceback (most recent call last):
4708     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4709
4710     >>> BMPString("ада", bounds=(2, 2))
4711     Traceback (most recent call last):
4712     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4713     >>> s = BMPString("ад", bounds=(2, 2))
4714     >>> s.encoding
4715     'utf-16-be'
4716     >>> hexenc(bytes(s))
4717     '04300434'
4718
4719     .. list-table::
4720        :header-rows: 1
4721
4722        * - Class
4723          - Text Encoding, validation
4724        * - :py:class:`pyderasn.UTF8String`
4725          - utf-8
4726        * - :py:class:`pyderasn.NumericString`
4727          - proper alphabet validation
4728        * - :py:class:`pyderasn.PrintableString`
4729          - proper alphabet validation
4730        * - :py:class:`pyderasn.TeletexString`
4731          - iso-8859-1
4732        * - :py:class:`pyderasn.T61String`
4733          - iso-8859-1
4734        * - :py:class:`pyderasn.VideotexString`
4735          - iso-8859-1
4736        * - :py:class:`pyderasn.IA5String`
4737          - proper alphabet validation
4738        * - :py:class:`pyderasn.GraphicString`
4739          - iso-8859-1
4740        * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4741          - proper alphabet validation
4742        * - :py:class:`pyderasn.GeneralString`
4743          - iso-8859-1
4744        * - :py:class:`pyderasn.UniversalString`
4745          - utf-32-be
4746        * - :py:class:`pyderasn.BMPString`
4747          - utf-16-be
4748     """
4749     __slots__ = ()
4750
4751     def _value_sanitize(self, value):
4752         value_raw = None
4753         value_decoded = None
4754         if isinstance(value, self.__class__):
4755             value_raw = value._value
4756         elif value.__class__ == text_type:
4757             value_decoded = value
4758         elif value.__class__ == binary_type:
4759             value_raw = value
4760         else:
4761             raise InvalidValueType((self.__class__, text_type, binary_type))
4762         try:
4763             value_raw = (
4764                 value_decoded.encode(self.encoding)
4765                 if value_raw is None else value_raw
4766             )
4767             value_decoded = (
4768                 value_raw.decode(self.encoding)
4769                 if value_decoded is None else value_decoded
4770             )
4771         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4772             raise DecodeError(str(err))
4773         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4774             raise BoundsError(
4775                 self._bound_min,
4776                 len(value_decoded),
4777                 self._bound_max,
4778             )
4779         return value_raw
4780
4781     def __eq__(self, their):
4782         if their.__class__ == binary_type:
4783             return self._value == their
4784         if their.__class__ == text_type:
4785             return self._value == their.encode(self.encoding)
4786         if not isinstance(their, self.__class__):
4787             return False
4788         return (
4789             self._value == their._value and
4790             self.tag == their.tag and
4791             self._expl == their._expl
4792         )
4793
4794     def __unicode__(self):
4795         if self.ready:
4796             return self._value.decode(self.encoding)
4797         return text_type(self._value)
4798
4799     def __repr__(self):
4800         return pp_console_row(next(self.pps(no_unicode=PY2)))
4801
4802     def pps(self, decode_path=(), no_unicode=False):
4803         value = None
4804         if self.ready:
4805             value = (
4806                 hexenc(bytes(self)) if no_unicode else
4807                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4808             )
4809         yield _pp(
4810             obj=self,
4811             asn1_type_name=self.asn1_type_name,
4812             obj_name=self.__class__.__name__,
4813             decode_path=decode_path,
4814             value=value,
4815             optional=self.optional,
4816             default=self == self.default,
4817             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4818             expl=None if self._expl is None else tag_decode(self._expl),
4819             offset=self.offset,
4820             tlen=self.tlen,
4821             llen=self.llen,
4822             vlen=self.vlen,
4823             expl_offset=self.expl_offset if self.expled else None,
4824             expl_tlen=self.expl_tlen if self.expled else None,
4825             expl_llen=self.expl_llen if self.expled else None,
4826             expl_vlen=self.expl_vlen if self.expled else None,
4827             expl_lenindef=self.expl_lenindef,
4828             ber_encoded=self.ber_encoded,
4829             bered=self.bered,
4830         )
4831         for pp in self.pps_lenindef(decode_path):
4832             yield pp
4833
4834
4835 class UTF8String(CommonString):
4836     __slots__ = ()
4837     tag_default = tag_encode(12)
4838     encoding = "utf-8"
4839     asn1_type_name = "UTF8String"
4840
4841
4842 class AllowableCharsMixin(object):
4843     @property
4844     def allowable_chars(self):
4845         if PY2:
4846             return self._allowable_chars
4847         return frozenset(six_unichr(c) for c in self._allowable_chars)
4848
4849     def _value_sanitize(self, value):
4850         value = super(AllowableCharsMixin, self)._value_sanitize(value)
4851         if not frozenset(value) <= self._allowable_chars:
4852             raise DecodeError("non satisfying alphabet value")
4853         return value
4854
4855
4856 class NumericString(AllowableCharsMixin, CommonString):
4857     """Numeric string
4858
4859     Its value is properly sanitized: only ASCII digits with spaces can
4860     be stored.
4861
4862     >>> NumericString().allowable_chars
4863     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4864     """
4865     __slots__ = ()
4866     tag_default = tag_encode(18)
4867     encoding = "ascii"
4868     asn1_type_name = "NumericString"
4869     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4870
4871
4872 PrintableStringState = namedtuple(
4873     "PrintableStringState",
4874     OctetStringState._fields + ("allowable_chars",),
4875     **NAMEDTUPLE_KWARGS
4876 )
4877
4878
4879 class PrintableString(AllowableCharsMixin, CommonString):
4880     """Printable string
4881
4882     Its value is properly sanitized: see X.680 41.4 table 10.
4883
4884     >>> PrintableString().allowable_chars
4885     frozenset([' ', "'", ..., 'z'])
4886     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4887     PrintableString PrintableString foo*bar
4888     >>> obj.allow_asterisk, obj.allow_ampersand
4889     (True, False)
4890     """
4891     __slots__ = ()
4892     tag_default = tag_encode(19)
4893     encoding = "ascii"
4894     asn1_type_name = "PrintableString"
4895     _allowable_chars = frozenset(
4896         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4897     )
4898     _asterisk = frozenset("*".encode("ascii"))
4899     _ampersand = frozenset("&".encode("ascii"))
4900
4901     def __init__(
4902             self,
4903             value=None,
4904             bounds=None,
4905             impl=None,
4906             expl=None,
4907             default=None,
4908             optional=False,
4909             _decoded=(0, 0, 0),
4910             ctx=None,
4911             allow_asterisk=False,
4912             allow_ampersand=False,
4913     ):
4914         """
4915         :param allow_asterisk: allow asterisk character
4916         :param allow_ampersand: allow ampersand character
4917         """
4918         if allow_asterisk:
4919             self._allowable_chars |= self._asterisk
4920         if allow_ampersand:
4921             self._allowable_chars |= self._ampersand
4922         super(PrintableString, self).__init__(
4923             value, bounds, impl, expl, default, optional, _decoded, ctx,
4924         )
4925
4926     @property
4927     def allow_asterisk(self):
4928         """Is asterisk character allowed?
4929         """
4930         return self._asterisk <= self._allowable_chars
4931
4932     @property
4933     def allow_ampersand(self):
4934         """Is ampersand character allowed?
4935         """
4936         return self._ampersand <= self._allowable_chars
4937
4938     def __getstate__(self):
4939         return PrintableStringState(
4940             *super(PrintableString, self).__getstate__(),
4941             **{"allowable_chars": self._allowable_chars}
4942         )
4943
4944     def __setstate__(self, state):
4945         super(PrintableString, self).__setstate__(state)
4946         self._allowable_chars = state.allowable_chars
4947
4948     def __call__(
4949             self,
4950             value=None,
4951             bounds=None,
4952             impl=None,
4953             expl=None,
4954             default=None,
4955             optional=None,
4956     ):
4957         return self.__class__(
4958             value=value,
4959             bounds=(
4960                 (self._bound_min, self._bound_max)
4961                 if bounds is None else bounds
4962             ),
4963             impl=self.tag if impl is None else impl,
4964             expl=self._expl if expl is None else expl,
4965             default=self.default if default is None else default,
4966             optional=self.optional if optional is None else optional,
4967             allow_asterisk=self.allow_asterisk,
4968             allow_ampersand=self.allow_ampersand,
4969         )
4970
4971
4972 class TeletexString(CommonString):
4973     __slots__ = ()
4974     tag_default = tag_encode(20)
4975     encoding = "iso-8859-1"
4976     asn1_type_name = "TeletexString"
4977
4978
4979 class T61String(TeletexString):
4980     __slots__ = ()
4981     asn1_type_name = "T61String"
4982
4983
4984 class VideotexString(CommonString):
4985     __slots__ = ()
4986     tag_default = tag_encode(21)
4987     encoding = "iso-8859-1"
4988     asn1_type_name = "VideotexString"
4989
4990
4991 class IA5String(AllowableCharsMixin, CommonString):
4992     """IA5 string
4993
4994     Its value is properly sanitized: it is a mix of
4995
4996     * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4997     * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
4998     * DEL character (0x7F)
4999
5000     It is just 7-bit ASCII.
5001
5002     >>> IA5String().allowable_chars
5003     frozenset(["NUL", ... "DEL"])
5004     """
5005     __slots__ = ()
5006     tag_default = tag_encode(22)
5007     encoding = "ascii"
5008     asn1_type_name = "IA5"
5009     _allowable_chars = frozenset(b"".join(
5010         six_unichr(c).encode("ascii") for c in six_xrange(128)
5011     ))
5012
5013
5014 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5015 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5016 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5017 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5018 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5019 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5020
5021
5022 class VisibleString(AllowableCharsMixin, CommonString):
5023     """Visible string
5024
5025     Its value is properly sanitized. ASCII subset from space to tilde is
5026     allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5027
5028     >>> VisibleString().allowable_chars
5029     frozenset([" ", ... "~"])
5030     """
5031     __slots__ = ()
5032     tag_default = tag_encode(26)
5033     encoding = "ascii"
5034     asn1_type_name = "VisibleString"
5035     _allowable_chars = frozenset(b"".join(
5036         six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5037     ))
5038
5039
5040 class ISO646String(VisibleString):
5041     __slots__ = ()
5042     asn1_type_name = "ISO646String"
5043
5044
5045 UTCTimeState = namedtuple(
5046     "UTCTimeState",
5047     OctetStringState._fields + ("ber_raw",),
5048     **NAMEDTUPLE_KWARGS
5049 )
5050
5051
5052 def str_to_time_fractions(value):
5053     v = pureint(value)
5054     year, v = (v // 10**10), (v % 10**10)
5055     month, v = (v // 10**8), (v % 10**8)
5056     day, v = (v // 10**6), (v % 10**6)
5057     hour, v = (v // 10**4), (v % 10**4)
5058     minute, second = (v // 100), (v % 100)
5059     return year, month, day, hour, minute, second
5060
5061
5062 class UTCTime(VisibleString):
5063     """``UTCTime`` datetime type
5064
5065     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5066     UTCTime UTCTime 2017-09-30T22:07:50
5067     >>> str(t)
5068     '170930220750Z'
5069     >>> bytes(t)
5070     b'170930220750Z'
5071     >>> t.todatetime()
5072     datetime.datetime(2017, 9, 30, 22, 7, 50)
5073     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5074     datetime.datetime(1957, 9, 30, 22, 7, 50)
5075
5076     If BER encoded value was met, then ``ber_raw`` attribute will hold
5077     its raw representation.
5078
5079     .. warning::
5080
5081        Pay attention that UTCTime can not hold full year, so all years
5082        having < 50 years are treated as 20xx, 19xx otherwise, according
5083        to X.509 recommendation.
5084
5085     .. warning::
5086
5087        No strict validation of UTC offsets are made, but very crude:
5088
5089        * minutes are not exceeding 60
5090        * offset value is not exceeding 14 hours
5091     """
5092     __slots__ = ("ber_raw",)
5093     tag_default = tag_encode(23)
5094     encoding = "ascii"
5095     asn1_type_name = "UTCTime"
5096     evgen_mode_skip_value = False
5097
5098     def __init__(
5099             self,
5100             value=None,
5101             impl=None,
5102             expl=None,
5103             default=None,
5104             optional=False,
5105             _decoded=(0, 0, 0),
5106             bounds=None,  # dummy argument, workability for OctetString.decode
5107             ctx=None,
5108     ):
5109         """
5110         :param value: set the value. Either datetime type, or
5111                       :py:class:`pyderasn.UTCTime` object
5112         :param bytes impl: override default tag with ``IMPLICIT`` one
5113         :param bytes expl: override default tag with ``EXPLICIT`` one
5114         :param default: set default value. Type same as in ``value``
5115         :param bool optional: is object ``OPTIONAL`` in sequence
5116         """
5117         super(UTCTime, self).__init__(
5118             None, None, impl, expl, None, optional, _decoded, ctx,
5119         )
5120         self._value = value
5121         self.ber_raw = None
5122         if value is not None:
5123             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5124             self.ber_encoded = self.ber_raw is not None
5125         if default is not None:
5126             default, _ = self._value_sanitize(default)
5127             self.default = self.__class__(
5128                 value=default,
5129                 impl=self.tag,
5130                 expl=self._expl,
5131             )
5132             if self._value is None:
5133                 self._value = default
5134             optional = True
5135         self.optional = optional
5136
5137     def _strptime_bered(self, value):
5138         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5139         value = value[10:]
5140         if len(value) == 0:
5141             raise ValueError("no timezone")
5142         year += 2000 if year < 50 else 1900
5143         decoded = datetime(year, month, day, hour, minute)
5144         offset = 0
5145         if value[-1] == "Z":
5146             value = value[:-1]
5147         else:
5148             if len(value) < 5:
5149                 raise ValueError("invalid UTC offset")
5150             if value[-5] == "-":
5151                 sign = -1
5152             elif value[-5] == "+":
5153                 sign = 1
5154             else:
5155                 raise ValueError("invalid UTC offset")
5156             v = pureint(value[-4:])
5157             offset, v = (60 * (v % 100)), v // 100
5158             if offset >= 3600:
5159                 raise ValueError("invalid UTC offset minutes")
5160             offset += 3600 * v
5161             if offset > 14 * 3600:
5162                 raise ValueError("too big UTC offset")
5163             offset *= sign
5164             value = value[:-5]
5165         if len(value) == 0:
5166             return offset, decoded
5167         if len(value) != 2:
5168             raise ValueError("invalid UTC offset seconds")
5169         seconds = pureint(value)
5170         if seconds >= 60:
5171             raise ValueError("invalid seconds value")
5172         return offset, decoded + timedelta(seconds=seconds)
5173
5174     def _strptime(self, value):
5175         # datetime.strptime's format: %y%m%d%H%M%SZ
5176         if len(value) != LEN_YYMMDDHHMMSSZ:
5177             raise ValueError("invalid UTCTime length")
5178         if value[-1] != "Z":
5179             raise ValueError("non UTC timezone")
5180         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5181         year += 2000 if year < 50 else 1900
5182         return datetime(year, month, day, hour, minute, second)
5183
5184     def _dt_sanitize(self, value):
5185         if value.year < 1950 or value.year > 2049:
5186             raise ValueError("UTCTime can hold only 1950-2049 years")
5187         return value.replace(microsecond=0)
5188
5189     def _value_sanitize(self, value, ctx=None):
5190         if value.__class__ == binary_type:
5191             try:
5192                 value_decoded = value.decode("ascii")
5193             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5194                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5195             err = None
5196             try:
5197                 return self._strptime(value_decoded), None
5198             except (TypeError, ValueError) as _err:
5199                 err = _err
5200                 if (ctx is not None) and ctx.get("bered", False):
5201                     try:
5202                         offset, _value = self._strptime_bered(value_decoded)
5203                         _value = _value - timedelta(seconds=offset)
5204                         return self._dt_sanitize(_value), value
5205                     except (TypeError, ValueError, OverflowError) as _err:
5206                         err = _err
5207             raise DecodeError(
5208                 "invalid %s format: %r" % (self.asn1_type_name, err),
5209                 klass=self.__class__,
5210             )
5211         if isinstance(value, self.__class__):
5212             return value._value, None
5213         if value.__class__ == datetime:
5214             return self._dt_sanitize(value), None
5215         raise InvalidValueType((self.__class__, datetime))
5216
5217     def _pp_value(self):
5218         if self.ready:
5219             value = self._value.isoformat()
5220             if self.ber_encoded:
5221                 value += " (%s)" % self.ber_raw
5222             return value
5223         return None
5224
5225     def __unicode__(self):
5226         if self.ready:
5227             value = self._value.isoformat()
5228             if self.ber_encoded:
5229                 value += " (%s)" % self.ber_raw
5230             return value
5231         return text_type(self._pp_value())
5232
5233     def __getstate__(self):
5234         return UTCTimeState(
5235             *super(UTCTime, self).__getstate__(),
5236             **{"ber_raw": self.ber_raw}
5237         )
5238
5239     def __setstate__(self, state):
5240         super(UTCTime, self).__setstate__(state)
5241         self.ber_raw = state.ber_raw
5242
5243     def __bytes__(self):
5244         self._assert_ready()
5245         return self._encode_time()
5246
5247     def __eq__(self, their):
5248         if their.__class__ == binary_type:
5249             return self._encode_time() == their
5250         if their.__class__ == datetime:
5251             return self.todatetime() == their
5252         if not isinstance(their, self.__class__):
5253             return False
5254         return (
5255             self._value == their._value and
5256             self.tag == their.tag and
5257             self._expl == their._expl
5258         )
5259
5260     def _encode_time(self):
5261         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5262
5263     def _encode(self):
5264         self._assert_ready()
5265         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5266
5267     def _encode1st(self, state):
5268         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5269
5270     def _encode2nd(self, writer, state_iter):
5271         self._assert_ready()
5272         write_full(writer, self._encode())
5273
5274     def _encode_cer(self, writer):
5275         write_full(writer, self._encode())
5276
5277     def todatetime(self):
5278         return self._value
5279
5280     def __repr__(self):
5281         return pp_console_row(next(self.pps()))
5282
5283     def pps(self, decode_path=()):
5284         yield _pp(
5285             obj=self,
5286             asn1_type_name=self.asn1_type_name,
5287             obj_name=self.__class__.__name__,
5288             decode_path=decode_path,
5289             value=self._pp_value(),
5290             optional=self.optional,
5291             default=self == self.default,
5292             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5293             expl=None if self._expl is None else tag_decode(self._expl),
5294             offset=self.offset,
5295             tlen=self.tlen,
5296             llen=self.llen,
5297             vlen=self.vlen,
5298             expl_offset=self.expl_offset if self.expled else None,
5299             expl_tlen=self.expl_tlen if self.expled else None,
5300             expl_llen=self.expl_llen if self.expled else None,
5301             expl_vlen=self.expl_vlen if self.expled else None,
5302             expl_lenindef=self.expl_lenindef,
5303             ber_encoded=self.ber_encoded,
5304             bered=self.bered,
5305         )
5306         for pp in self.pps_lenindef(decode_path):
5307             yield pp
5308
5309
5310 class GeneralizedTime(UTCTime):
5311     """``GeneralizedTime`` datetime type
5312
5313     This type is similar to :py:class:`pyderasn.UTCTime`.
5314
5315     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5316     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5317     >>> str(t)
5318     '20170930220750.000123Z'
5319     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5320     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5321
5322     .. warning::
5323
5324        Only microsecond fractions are supported in DER encoding.
5325        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5326        higher precision values.
5327
5328     .. warning::
5329
5330        BER encoded data can loss information (accuracy) during decoding
5331        because of float transformations.
5332
5333     .. warning::
5334
5335        Local times (without explicit timezone specification) are treated
5336        as UTC one, no transformations are made.
5337
5338     .. warning::
5339
5340        Zero year is unsupported.
5341     """
5342     __slots__ = ()
5343     tag_default = tag_encode(24)
5344     asn1_type_name = "GeneralizedTime"
5345
5346     def _dt_sanitize(self, value):
5347         return value
5348
5349     def _strptime_bered(self, value):
5350         if len(value) < 4 + 3 * 2:
5351             raise ValueError("invalid GeneralizedTime")
5352         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5353         decoded = datetime(year, month, day, hour)
5354         offset, value = 0, value[10:]
5355         if len(value) == 0:
5356             return offset, decoded
5357         if value[-1] == "Z":
5358             value = value[:-1]
5359         else:
5360             for char, sign in (("-", -1), ("+", 1)):
5361                 idx = value.rfind(char)
5362                 if idx == -1:
5363                     continue
5364                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5365                 v = pureint(offset_raw)
5366                 if len(offset_raw) == 4:
5367                     offset, v = (60 * (v % 100)), v // 100
5368                     if offset >= 3600:
5369                         raise ValueError("invalid UTC offset minutes")
5370                 elif len(offset_raw) == 2:
5371                     pass
5372                 else:
5373                     raise ValueError("invalid UTC offset")
5374                 offset += 3600 * v
5375                 if offset > 14 * 3600:
5376                     raise ValueError("too big UTC offset")
5377                 offset *= sign
5378                 break
5379         if len(value) == 0:
5380             return offset, decoded
5381         if value[0] in DECIMAL_SIGNS:
5382             return offset, (
5383                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5384             )
5385         if len(value) < 2:
5386             raise ValueError("stripped minutes")
5387         decoded += timedelta(seconds=60 * pureint(value[:2]))
5388         value = value[2:]
5389         if len(value) == 0:
5390             return offset, decoded
5391         if value[0] in DECIMAL_SIGNS:
5392             return offset, (
5393                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5394             )
5395         if len(value) < 2:
5396             raise ValueError("stripped seconds")
5397         decoded += timedelta(seconds=pureint(value[:2]))
5398         value = value[2:]
5399         if len(value) == 0:
5400             return offset, decoded
5401         if value[0] not in DECIMAL_SIGNS:
5402             raise ValueError("invalid format after seconds")
5403         return offset, (
5404             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5405         )
5406
5407     def _strptime(self, value):
5408         l = len(value)
5409         if l == LEN_YYYYMMDDHHMMSSZ:
5410             # datetime.strptime's format: %Y%m%d%H%M%SZ
5411             if value[-1] != "Z":
5412                 raise ValueError("non UTC timezone")
5413             return datetime(*str_to_time_fractions(value[:-1]))
5414         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5415             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5416             if value[-1] != "Z":
5417                 raise ValueError("non UTC timezone")
5418             if value[14] != ".":
5419                 raise ValueError("no fractions separator")
5420             us = value[15:-1]
5421             if us[-1] == "0":
5422                 raise ValueError("trailing zero")
5423             us_len = len(us)
5424             if us_len > 6:
5425                 raise ValueError("only microsecond fractions are supported")
5426             us = pureint(us + ("0" * (6 - us_len)))
5427             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5428             return datetime(year, month, day, hour, minute, second, us)
5429         raise ValueError("invalid GeneralizedTime length")
5430
5431     def _encode_time(self):
5432         value = self._value
5433         encoded = value.strftime("%Y%m%d%H%M%S")
5434         if value.microsecond > 0:
5435             encoded += (".%06d" % value.microsecond).rstrip("0")
5436         return (encoded + "Z").encode("ascii")
5437
5438     def _encode(self):
5439         self._assert_ready()
5440         value = self._value
5441         if value.microsecond > 0:
5442             encoded = self._encode_time()
5443             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5444         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5445
5446     def _encode1st(self, state):
5447         self._assert_ready()
5448         vlen = len(self._encode_time())
5449         return len(self.tag) + len_size(vlen) + vlen, state
5450
5451     def _encode2nd(self, writer, state_iter):
5452         write_full(writer, self._encode())
5453
5454
5455 class GraphicString(CommonString):
5456     __slots__ = ()
5457     tag_default = tag_encode(25)
5458     encoding = "iso-8859-1"
5459     asn1_type_name = "GraphicString"
5460
5461
5462 class GeneralString(CommonString):
5463     __slots__ = ()
5464     tag_default = tag_encode(27)
5465     encoding = "iso-8859-1"
5466     asn1_type_name = "GeneralString"
5467
5468
5469 class UniversalString(CommonString):
5470     __slots__ = ()
5471     tag_default = tag_encode(28)
5472     encoding = "utf-32-be"
5473     asn1_type_name = "UniversalString"
5474
5475
5476 class BMPString(CommonString):
5477     __slots__ = ()
5478     tag_default = tag_encode(30)
5479     encoding = "utf-16-be"
5480     asn1_type_name = "BMPString"
5481
5482
5483 ChoiceState = namedtuple(
5484     "ChoiceState",
5485     BasicState._fields + ("specs", "value",),
5486     **NAMEDTUPLE_KWARGS
5487 )
5488
5489
5490 class Choice(Obj):
5491     """``CHOICE`` special type
5492
5493     ::
5494
5495         class GeneralName(Choice):
5496             schema = (
5497                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5498                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5499             )
5500
5501     >>> gn = GeneralName()
5502     GeneralName CHOICE
5503     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5504     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5505     >>> gn["dNSName"] = IA5String("bar.baz")
5506     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5507     >>> gn["rfc822Name"]
5508     None
5509     >>> gn["dNSName"]
5510     [2] IA5String IA5 bar.baz
5511     >>> gn.choice
5512     'dNSName'
5513     >>> gn.value == gn["dNSName"]
5514     True
5515     >>> gn.specs
5516     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5517
5518     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5519     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5520     """
5521     __slots__ = ("specs",)
5522     tag_default = None
5523     asn1_type_name = "CHOICE"
5524
5525     def __init__(
5526             self,
5527             value=None,
5528             schema=None,
5529             impl=None,
5530             expl=None,
5531             default=None,
5532             optional=False,
5533             _decoded=(0, 0, 0),
5534     ):
5535         """
5536         :param value: set the value. Either ``(choice, value)`` tuple, or
5537                       :py:class:`pyderasn.Choice` object
5538         :param bytes impl: can not be set, do **not** use it
5539         :param bytes expl: override default tag with ``EXPLICIT`` one
5540         :param default: set default value. Type same as in ``value``
5541         :param bool optional: is object ``OPTIONAL`` in sequence
5542         """
5543         if impl is not None:
5544             raise ValueError("no implicit tag allowed for CHOICE")
5545         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5546         if schema is None:
5547             schema = getattr(self, "schema", ())
5548         if len(schema) == 0:
5549             raise ValueError("schema must be specified")
5550         self.specs = (
5551             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5552         )
5553         self._value = None
5554         if value is not None:
5555             self._value = self._value_sanitize(value)
5556         if default is not None:
5557             default_value = self._value_sanitize(default)
5558             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5559             default_obj.specs = self.specs
5560             default_obj._value = default_value
5561             self.default = default_obj
5562             if value is None:
5563                 self._value = copy(default_obj._value)
5564         if self._expl is not None:
5565             tag_class, _, tag_num = tag_decode(self._expl)
5566             self._tag_order = (tag_class, tag_num)
5567
5568     def _value_sanitize(self, value):
5569         if (value.__class__ == tuple) and len(value) == 2:
5570             choice, obj = value
5571             spec = self.specs.get(choice)
5572             if spec is None:
5573                 raise ObjUnknown(choice)
5574             if not isinstance(obj, spec.__class__):
5575                 raise InvalidValueType((spec,))
5576             return (choice, spec(obj))
5577         if isinstance(value, self.__class__):
5578             return value._value
5579         raise InvalidValueType((self.__class__, tuple))
5580
5581     @property
5582     def ready(self):
5583         return self._value is not None and self._value[1].ready
5584
5585     @property
5586     def bered(self):
5587         return self.expl_lenindef or (
5588             (self._value is not None) and
5589             self._value[1].bered
5590         )
5591
5592     def __getstate__(self):
5593         return ChoiceState(
5594             __version__,
5595             self.tag,
5596             self._tag_order,
5597             self._expl,
5598             self.default,
5599             self.optional,
5600             self.offset,
5601             self.llen,
5602             self.vlen,
5603             self.expl_lenindef,
5604             self.lenindef,
5605             self.ber_encoded,
5606             self.specs,
5607             copy(self._value),
5608         )
5609
5610     def __setstate__(self, state):
5611         super(Choice, self).__setstate__(state)
5612         self.specs = state.specs
5613         self._value = state.value
5614
5615     def __eq__(self, their):
5616         if (their.__class__ == tuple) and len(their) == 2:
5617             return self._value == their
5618         if not isinstance(their, self.__class__):
5619             return False
5620         return (
5621             self.specs == their.specs and
5622             self._value == their._value
5623         )
5624
5625     def __call__(
5626             self,
5627             value=None,
5628             expl=None,
5629             default=None,
5630             optional=None,
5631     ):
5632         return self.__class__(
5633             value=value,
5634             schema=self.specs,
5635             expl=self._expl if expl is None else expl,
5636             default=self.default if default is None else default,
5637             optional=self.optional if optional is None else optional,
5638         )
5639
5640     @property
5641     def choice(self):
5642         """Name of the choice
5643         """
5644         self._assert_ready()
5645         return self._value[0]
5646
5647     @property
5648     def value(self):
5649         """Value of underlying choice
5650         """
5651         self._assert_ready()
5652         return self._value[1]
5653
5654     @property
5655     def tag_order(self):
5656         self._assert_ready()
5657         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5658
5659     @property
5660     def tag_order_cer(self):
5661         return min(v.tag_order_cer for v in itervalues(self.specs))
5662
5663     def __getitem__(self, key):
5664         if key not in self.specs:
5665             raise ObjUnknown(key)
5666         if self._value is None:
5667             return None
5668         choice, value = self._value
5669         if choice != key:
5670             return None
5671         return value
5672
5673     def __setitem__(self, key, value):
5674         spec = self.specs.get(key)
5675         if spec is None:
5676             raise ObjUnknown(key)
5677         if not isinstance(value, spec.__class__):
5678             raise InvalidValueType((spec.__class__,))
5679         self._value = (key, spec(value))
5680
5681     @property
5682     def tlen(self):
5683         return 0
5684
5685     @property
5686     def decoded(self):
5687         return self._value[1].decoded if self.ready else False
5688
5689     def _encode(self):
5690         self._assert_ready()
5691         return self._value[1].encode()
5692
5693     def _encode1st(self, state):
5694         self._assert_ready()
5695         return self._value[1].encode1st(state)
5696
5697     def _encode2nd(self, writer, state_iter):
5698         self._value[1].encode2nd(writer, state_iter)
5699
5700     def _encode_cer(self, writer):
5701         self._assert_ready()
5702         self._value[1].encode_cer(writer)
5703
5704     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5705         for choice, spec in iteritems(self.specs):
5706             sub_decode_path = decode_path + (choice,)
5707             try:
5708                 spec.decode(
5709                     tlv,
5710                     offset=offset,
5711                     leavemm=True,
5712                     decode_path=sub_decode_path,
5713                     ctx=ctx,
5714                     tag_only=True,
5715                     _ctx_immutable=False,
5716                 )
5717             except TagMismatch:
5718                 continue
5719             break
5720         else:
5721             raise TagMismatch(
5722                 klass=self.__class__,
5723                 decode_path=decode_path,
5724                 offset=offset,
5725             )
5726         if tag_only:  # pragma: no cover
5727             yield None
5728             return
5729         if evgen_mode:
5730             for _decode_path, value, tail in spec.decode_evgen(
5731                     tlv,
5732                     offset=offset,
5733                     leavemm=True,
5734                     decode_path=sub_decode_path,
5735                     ctx=ctx,
5736                     _ctx_immutable=False,
5737             ):
5738                 yield _decode_path, value, tail
5739         else:
5740             _, value, tail = next(spec.decode_evgen(
5741                 tlv,
5742                 offset=offset,
5743                 leavemm=True,
5744                 decode_path=sub_decode_path,
5745                 ctx=ctx,
5746                 _ctx_immutable=False,
5747                 _evgen_mode=False,
5748             ))
5749         obj = self.__class__(
5750             schema=self.specs,
5751             expl=self._expl,
5752             default=self.default,
5753             optional=self.optional,
5754             _decoded=(offset, 0, value.fulllen),
5755         )
5756         obj._value = (choice, value)
5757         yield decode_path, obj, tail
5758
5759     def __repr__(self):
5760         value = pp_console_row(next(self.pps()))
5761         if self.ready:
5762             value = "%s[%r]" % (value, self.value)
5763         return value
5764
5765     def pps(self, decode_path=()):
5766         yield _pp(
5767             obj=self,
5768             asn1_type_name=self.asn1_type_name,
5769             obj_name=self.__class__.__name__,
5770             decode_path=decode_path,
5771             value=self.choice if self.ready else None,
5772             optional=self.optional,
5773             default=self == self.default,
5774             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5775             expl=None if self._expl is None else tag_decode(self._expl),
5776             offset=self.offset,
5777             tlen=self.tlen,
5778             llen=self.llen,
5779             vlen=self.vlen,
5780             expl_lenindef=self.expl_lenindef,
5781             bered=self.bered,
5782         )
5783         if self.ready:
5784             yield self.value.pps(decode_path=decode_path + (self.choice,))
5785         for pp in self.pps_lenindef(decode_path):
5786             yield pp
5787
5788
5789 class PrimitiveTypes(Choice):
5790     """Predefined ``CHOICE`` for all generic primitive types
5791
5792     It could be useful for general decoding of some unspecified values:
5793
5794     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5795     OCTET STRING 3 bytes 666f6f
5796     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5797     INTEGER 1193046
5798     """
5799     __slots__ = ()
5800     schema = tuple((klass.__name__, klass()) for klass in (
5801         Boolean,
5802         Integer,
5803         BitString,
5804         OctetString,
5805         Null,
5806         ObjectIdentifier,
5807         UTF8String,
5808         NumericString,
5809         PrintableString,
5810         TeletexString,
5811         VideotexString,
5812         IA5String,
5813         UTCTime,
5814         GeneralizedTime,
5815         GraphicString,
5816         VisibleString,
5817         ISO646String,
5818         GeneralString,
5819         UniversalString,
5820         BMPString,
5821     ))
5822
5823
5824 AnyState = namedtuple(
5825     "AnyState",
5826     BasicState._fields + ("value", "defined"),
5827     **NAMEDTUPLE_KWARGS
5828 )
5829
5830
5831 class Any(Obj):
5832     """``ANY`` special type
5833
5834     >>> Any(Integer(-123))
5835     ANY INTEGER -123 (0X:7B)
5836     >>> a = Any(OctetString(b"hello world").encode())
5837     ANY 040b68656c6c6f20776f726c64
5838     >>> hexenc(bytes(a))
5839     b'0x040x0bhello world'
5840     """
5841     __slots__ = ("defined",)
5842     tag_default = tag_encode(0)
5843     asn1_type_name = "ANY"
5844
5845     def __init__(
5846             self,
5847             value=None,
5848             expl=None,
5849             optional=False,
5850             _decoded=(0, 0, 0),
5851     ):
5852         """
5853         :param value: set the value. Either any kind of pyderasn's
5854                       **ready** object, or bytes. Pay attention that
5855                       **no** validation is performed if raw binary value
5856                       is valid TLV, except just tag decoding
5857         :param bytes expl: override default tag with ``EXPLICIT`` one
5858         :param bool optional: is object ``OPTIONAL`` in sequence
5859         """
5860         super(Any, self).__init__(None, expl, None, optional, _decoded)
5861         if value is None:
5862             self._value = None
5863         else:
5864             value = self._value_sanitize(value)
5865             self._value = value
5866             if self._expl is None:
5867                 if value.__class__ == binary_type:
5868                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5869                 else:
5870                     tag_class, tag_num = value.tag_order
5871             else:
5872                 tag_class, _, tag_num = tag_decode(self._expl)
5873             self._tag_order = (tag_class, tag_num)
5874         self.defined = None
5875
5876     def _value_sanitize(self, value):
5877         if value.__class__ == binary_type:
5878             if len(value) == 0:
5879                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5880             return value
5881         if isinstance(value, self.__class__):
5882             return value._value
5883         if not isinstance(value, Obj):
5884             raise InvalidValueType((self.__class__, Obj, binary_type))
5885         return value
5886
5887     @property
5888     def ready(self):
5889         return self._value is not None
5890
5891     @property
5892     def tag_order(self):
5893         self._assert_ready()
5894         return self._tag_order
5895
5896     @property
5897     def bered(self):
5898         if self.expl_lenindef or self.lenindef:
5899             return True
5900         if self.defined is None:
5901             return False
5902         return self.defined[1].bered
5903
5904     def __getstate__(self):
5905         return AnyState(
5906             __version__,
5907             self.tag,
5908             self._tag_order,
5909             self._expl,
5910             None,
5911             self.optional,
5912             self.offset,
5913             self.llen,
5914             self.vlen,
5915             self.expl_lenindef,
5916             self.lenindef,
5917             self.ber_encoded,
5918             self._value,
5919             self.defined,
5920         )
5921
5922     def __setstate__(self, state):
5923         super(Any, self).__setstate__(state)
5924         self._value = state.value
5925         self.defined = state.defined
5926
5927     def __eq__(self, their):
5928         if their.__class__ == binary_type:
5929             if self._value.__class__ == binary_type:
5930                 return self._value == their
5931             return self._value.encode() == their
5932         if issubclass(their.__class__, Any):
5933             if self.ready and their.ready:
5934                 return bytes(self) == bytes(their)
5935             return self.ready == their.ready
5936         return False
5937
5938     def __call__(
5939             self,
5940             value=None,
5941             expl=None,
5942             optional=None,
5943     ):
5944         return self.__class__(
5945             value=value,
5946             expl=self._expl if expl is None else expl,
5947             optional=self.optional if optional is None else optional,
5948         )
5949
5950     def __bytes__(self):
5951         self._assert_ready()
5952         value = self._value
5953         if value.__class__ == binary_type:
5954             return value
5955         return self._value.encode()
5956
5957     @property
5958     def tlen(self):
5959         return 0
5960
5961     def _encode(self):
5962         self._assert_ready()
5963         value = self._value
5964         if value.__class__ == binary_type:
5965             return value
5966         return value.encode()
5967
5968     def _encode1st(self, state):
5969         self._assert_ready()
5970         value = self._value
5971         if value.__class__ == binary_type:
5972             return len(value), state
5973         return value.encode1st(state)
5974
5975     def _encode2nd(self, writer, state_iter):
5976         value = self._value
5977         if value.__class__ == binary_type:
5978             write_full(writer, value)
5979         else:
5980             value.encode2nd(writer, state_iter)
5981
5982     def _encode_cer(self, writer):
5983         self._assert_ready()
5984         value = self._value
5985         if value.__class__ == binary_type:
5986             write_full(writer, value)
5987         else:
5988             value.encode_cer(writer)
5989
5990     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5991         try:
5992             t, tlen, lv = tag_strip(tlv)
5993         except DecodeError as err:
5994             raise err.__class__(
5995                 msg=err.msg,
5996                 klass=self.__class__,
5997                 decode_path=decode_path,
5998                 offset=offset,
5999             )
6000         try:
6001             l, llen, v = len_decode(lv)
6002         except LenIndefForm as err:
6003             if not ctx.get("bered", False):
6004                 raise err.__class__(
6005                     msg=err.msg,
6006                     klass=self.__class__,
6007                     decode_path=decode_path,
6008                     offset=offset,
6009                 )
6010             llen, vlen, v = 1, 0, lv[1:]
6011             sub_offset = offset + tlen + llen
6012             chunk_i = 0
6013             while v[:EOC_LEN].tobytes() != EOC:
6014                 chunk, v = Any().decode(
6015                     v,
6016                     offset=sub_offset,
6017                     decode_path=decode_path + (str(chunk_i),),
6018                     leavemm=True,
6019                     ctx=ctx,
6020                     _ctx_immutable=False,
6021                 )
6022                 vlen += chunk.tlvlen
6023                 sub_offset += chunk.tlvlen
6024                 chunk_i += 1
6025             tlvlen = tlen + llen + vlen + EOC_LEN
6026             obj = self.__class__(
6027                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6028                 expl=self._expl,
6029                 optional=self.optional,
6030                 _decoded=(offset, 0, tlvlen),
6031             )
6032             obj.lenindef = True
6033             obj.tag = t.tobytes()
6034             yield decode_path, obj, v[EOC_LEN:]
6035             return
6036         except DecodeError as err:
6037             raise err.__class__(
6038                 msg=err.msg,
6039                 klass=self.__class__,
6040                 decode_path=decode_path,
6041                 offset=offset,
6042             )
6043         if l > len(v):
6044             raise NotEnoughData(
6045                 "encoded length is longer than data",
6046                 klass=self.__class__,
6047                 decode_path=decode_path,
6048                 offset=offset,
6049             )
6050         tlvlen = tlen + llen + l
6051         v, tail = tlv[:tlvlen], v[l:]
6052         obj = self.__class__(
6053             value=None if evgen_mode else v.tobytes(),
6054             expl=self._expl,
6055             optional=self.optional,
6056             _decoded=(offset, 0, tlvlen),
6057         )
6058         obj.tag = t.tobytes()
6059         yield decode_path, obj, tail
6060
6061     def __repr__(self):
6062         return pp_console_row(next(self.pps()))
6063
6064     def pps(self, decode_path=()):
6065         value = self._value
6066         if value is None:
6067             pass
6068         elif value.__class__ == binary_type:
6069             value = None
6070         else:
6071             value = repr(value)
6072         yield _pp(
6073             obj=self,
6074             asn1_type_name=self.asn1_type_name,
6075             obj_name=self.__class__.__name__,
6076             decode_path=decode_path,
6077             value=value,
6078             blob=self._value if self._value.__class__ == binary_type else None,
6079             optional=self.optional,
6080             default=self == self.default,
6081             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6082             expl=None if self._expl is None else tag_decode(self._expl),
6083             offset=self.offset,
6084             tlen=self.tlen,
6085             llen=self.llen,
6086             vlen=self.vlen,
6087             expl_offset=self.expl_offset if self.expled else None,
6088             expl_tlen=self.expl_tlen if self.expled else None,
6089             expl_llen=self.expl_llen if self.expled else None,
6090             expl_vlen=self.expl_vlen if self.expled else None,
6091             expl_lenindef=self.expl_lenindef,
6092             lenindef=self.lenindef,
6093             bered=self.bered,
6094         )
6095         defined_by, defined = self.defined or (None, None)
6096         if defined_by is not None:
6097             yield defined.pps(
6098                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6099             )
6100         for pp in self.pps_lenindef(decode_path):
6101             yield pp
6102
6103
6104 ########################################################################
6105 # ASN.1 constructed types
6106 ########################################################################
6107
6108 def abs_decode_path(decode_path, rel_path):
6109     """Create an absolute decode path from current and relative ones
6110
6111     :param decode_path: current decode path, starting point. Tuple of strings
6112     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6113                      If first tuple's element is "/", then treat it as
6114                      an absolute path, ignoring ``decode_path`` as
6115                      starting point. Also this tuple can contain ".."
6116                      elements, stripping the leading element from
6117                      ``decode_path``
6118
6119     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6120     ("foo", "bar", "baz", "whatever")
6121     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6122     ("foo", "whatever")
6123     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6124     ("baz", "whatever")
6125     """
6126     if rel_path[0] == "/":
6127         return rel_path[1:]
6128     if rel_path[0] == "..":
6129         return abs_decode_path(decode_path[:-1], rel_path[1:])
6130     return decode_path + rel_path
6131
6132
6133 SequenceState = namedtuple(
6134     "SequenceState",
6135     BasicState._fields + ("specs", "value",),
6136     **NAMEDTUPLE_KWARGS
6137 )
6138
6139
6140 class SequenceEncode1stMixing(object):
6141     def _encode1st(self, state):
6142         state.append(0)
6143         idx = len(state) - 1
6144         vlen = 0
6145         for v in self._values_for_encoding():
6146             l, _ = v.encode1st(state)
6147             vlen += l
6148         state[idx] = vlen
6149         return len(self.tag) + len_size(vlen) + vlen, state
6150
6151
6152 class Sequence(SequenceEncode1stMixing, Obj):
6153     """``SEQUENCE`` structure type
6154
6155     You have to make specification of sequence::
6156
6157         class Extension(Sequence):
6158             schema = (
6159                 ("extnID", ObjectIdentifier()),
6160                 ("critical", Boolean(default=False)),
6161                 ("extnValue", OctetString()),
6162             )
6163
6164     Then, you can work with it as with dictionary.
6165
6166     >>> ext = Extension()
6167     >>> Extension().specs
6168     OrderedDict([
6169         ('extnID', OBJECT IDENTIFIER),
6170         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6171         ('extnValue', OCTET STRING),
6172     ])
6173     >>> ext["extnID"] = "1.2.3"
6174     Traceback (most recent call last):
6175     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6176     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6177
6178     You can determine if sequence is ready to be encoded:
6179
6180     >>> ext.ready
6181     False
6182     >>> ext.encode()
6183     Traceback (most recent call last):
6184     pyderasn.ObjNotReady: object is not ready: extnValue
6185     >>> ext["extnValue"] = OctetString(b"foobar")
6186     >>> ext.ready
6187     True
6188
6189     Value you want to assign, must have the same **type** as in
6190     corresponding specification, but it can have different tags,
6191     optional/default attributes -- they will be taken from specification
6192     automatically::
6193
6194         class TBSCertificate(Sequence):
6195             schema = (
6196                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6197             [...]
6198
6199     >>> tbs = TBSCertificate()
6200     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6201
6202     Assign ``None`` to remove value from sequence.
6203
6204     You can set values in Sequence during its initialization:
6205
6206     >>> AlgorithmIdentifier((
6207         ("algorithm", ObjectIdentifier("1.2.3")),
6208         ("parameters", Any(Null()))
6209     ))
6210     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6211
6212     You can determine if value exists/set in the sequence and take its value:
6213
6214     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6215     (True, True, False)
6216     >>> ext["extnID"]
6217     OBJECT IDENTIFIER 1.2.3
6218
6219     But pay attention that if value has default, then it won't be (not
6220     in) in the sequence (because ``DEFAULT`` must not be encoded in
6221     DER), but you can read its value:
6222
6223     >>> "critical" in ext, ext["critical"]
6224     (False, BOOLEAN False)
6225     >>> ext["critical"] = Boolean(True)
6226     >>> "critical" in ext, ext["critical"]
6227     (True, BOOLEAN True)
6228
6229     All defaulted values are always optional.
6230
6231     .. _allow_default_values_ctx:
6232
6233     DER prohibits default value encoding and will raise an error if
6234     default value is unexpectedly met during decode.
6235     If :ref:`bered <bered_ctx>` context option is set, then no error
6236     will be raised, but ``bered`` attribute set. You can disable strict
6237     defaulted values existence validation by setting
6238     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6239
6240     All values with DEFAULT specified are decoded atomically in
6241     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6242     SEQUENCE, then it will be yielded as a single element, not
6243     disassembled. That is required for DEFAULT existence check.
6244
6245     Two sequences are equal if they have equal specification (schema),
6246     implicit/explicit tagging and the same values.
6247     """
6248     __slots__ = ("specs",)
6249     tag_default = tag_encode(form=TagFormConstructed, num=16)
6250     asn1_type_name = "SEQUENCE"
6251
6252     def __init__(
6253             self,
6254             value=None,
6255             schema=None,
6256             impl=None,
6257             expl=None,
6258             default=None,
6259             optional=False,
6260             _decoded=(0, 0, 0),
6261     ):
6262         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6263         if schema is None:
6264             schema = getattr(self, "schema", ())
6265         self.specs = (
6266             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6267         )
6268         self._value = {}
6269         if value is not None:
6270             if issubclass(value.__class__, Sequence):
6271                 self._value = value._value
6272             elif hasattr(value, "__iter__"):
6273                 for seq_key, seq_value in value:
6274                     self[seq_key] = seq_value
6275             else:
6276                 raise InvalidValueType((Sequence,))
6277         if default is not None:
6278             if not issubclass(default.__class__, Sequence):
6279                 raise InvalidValueType((Sequence,))
6280             default_value = default._value
6281             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6282             default_obj.specs = self.specs
6283             default_obj._value = default_value
6284             self.default = default_obj
6285             if value is None:
6286                 self._value = copy(default_obj._value)
6287
6288     @property
6289     def ready(self):
6290         for name, spec in iteritems(self.specs):
6291             value = self._value.get(name)
6292             if value is None:
6293                 if spec.optional:
6294                     continue
6295                 return False
6296             if not value.ready:
6297                 return False
6298         return True
6299
6300     @property
6301     def bered(self):
6302         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6303             return True
6304         return any(value.bered for value in itervalues(self._value))
6305
6306     def __getstate__(self):
6307         return SequenceState(
6308             __version__,
6309             self.tag,
6310             self._tag_order,
6311             self._expl,
6312             self.default,
6313             self.optional,
6314             self.offset,
6315             self.llen,
6316             self.vlen,
6317             self.expl_lenindef,
6318             self.lenindef,
6319             self.ber_encoded,
6320             self.specs,
6321             {k: copy(v) for k, v in iteritems(self._value)},
6322         )
6323
6324     def __setstate__(self, state):
6325         super(Sequence, self).__setstate__(state)
6326         self.specs = state.specs
6327         self._value = state.value
6328
6329     def __eq__(self, their):
6330         if not isinstance(their, self.__class__):
6331             return False
6332         return (
6333             self.specs == their.specs and
6334             self.tag == their.tag and
6335             self._expl == their._expl and
6336             self._value == their._value
6337         )
6338
6339     def __call__(
6340             self,
6341             value=None,
6342             impl=None,
6343             expl=None,
6344             default=None,
6345             optional=None,
6346     ):
6347         return self.__class__(
6348             value=value,
6349             schema=self.specs,
6350             impl=self.tag if impl is None else impl,
6351             expl=self._expl if expl is None else expl,
6352             default=self.default if default is None else default,
6353             optional=self.optional if optional is None else optional,
6354         )
6355
6356     def __contains__(self, key):
6357         return key in self._value
6358
6359     def __setitem__(self, key, value):
6360         spec = self.specs.get(key)
6361         if spec is None:
6362             raise ObjUnknown(key)
6363         if value is None:
6364             self._value.pop(key, None)
6365             return
6366         if not isinstance(value, spec.__class__):
6367             raise InvalidValueType((spec.__class__,))
6368         value = spec(value=value)
6369         if spec.default is not None and value == spec.default:
6370             self._value.pop(key, None)
6371             return
6372         self._value[key] = value
6373
6374     def __getitem__(self, key):
6375         value = self._value.get(key)
6376         if value is not None:
6377             return value
6378         spec = self.specs.get(key)
6379         if spec is None:
6380             raise ObjUnknown(key)
6381         if spec.default is not None:
6382             return spec.default
6383         return None
6384
6385     def _values_for_encoding(self):
6386         for name, spec in iteritems(self.specs):
6387             value = self._value.get(name)
6388             if value is None:
6389                 if spec.optional:
6390                     continue
6391                 raise ObjNotReady(name)
6392             yield value
6393
6394     def _encode(self):
6395         v = b"".join(v.encode() for v in self._values_for_encoding())
6396         return b"".join((self.tag, len_encode(len(v)), v))
6397
6398     def _encode2nd(self, writer, state_iter):
6399         write_full(writer, self.tag + len_encode(next(state_iter)))
6400         for v in self._values_for_encoding():
6401             v.encode2nd(writer, state_iter)
6402
6403     def _encode_cer(self, writer):
6404         write_full(writer, self.tag + LENINDEF)
6405         for v in self._values_for_encoding():
6406             v.encode_cer(writer)
6407         write_full(writer, EOC)
6408
6409     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6410         try:
6411             t, tlen, lv = tag_strip(tlv)
6412         except DecodeError as err:
6413             raise err.__class__(
6414                 msg=err.msg,
6415                 klass=self.__class__,
6416                 decode_path=decode_path,
6417                 offset=offset,
6418             )
6419         if t != self.tag:
6420             raise TagMismatch(
6421                 klass=self.__class__,
6422                 decode_path=decode_path,
6423                 offset=offset,
6424             )
6425         if tag_only:  # pragma: no cover
6426             yield None
6427             return
6428         lenindef = False
6429         ctx_bered = ctx.get("bered", False)
6430         try:
6431             l, llen, v = len_decode(lv)
6432         except LenIndefForm as err:
6433             if not ctx_bered:
6434                 raise err.__class__(
6435                     msg=err.msg,
6436                     klass=self.__class__,
6437                     decode_path=decode_path,
6438                     offset=offset,
6439                 )
6440             l, llen, v = 0, 1, lv[1:]
6441             lenindef = True
6442         except DecodeError as err:
6443             raise err.__class__(
6444                 msg=err.msg,
6445                 klass=self.__class__,
6446                 decode_path=decode_path,
6447                 offset=offset,
6448             )
6449         if l > len(v):
6450             raise NotEnoughData(
6451                 "encoded length is longer than data",
6452                 klass=self.__class__,
6453                 decode_path=decode_path,
6454                 offset=offset,
6455             )
6456         if not lenindef:
6457             v, tail = v[:l], v[l:]
6458         vlen = 0
6459         sub_offset = offset + tlen + llen
6460         values = {}
6461         ber_encoded = False
6462         ctx_allow_default_values = ctx.get("allow_default_values", False)
6463         for name, spec in iteritems(self.specs):
6464             if spec.optional and (
6465                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6466                     len(v) == 0
6467             ):
6468                 continue
6469             spec_defaulted = spec.default is not None
6470             sub_decode_path = decode_path + (name,)
6471             try:
6472                 if evgen_mode and not spec_defaulted:
6473                     for _decode_path, value, v_tail in spec.decode_evgen(
6474                             v,
6475                             sub_offset,
6476                             leavemm=True,
6477                             decode_path=sub_decode_path,
6478                             ctx=ctx,
6479                             _ctx_immutable=False,
6480                     ):
6481                         yield _decode_path, value, v_tail
6482                 else:
6483                     _, value, v_tail = next(spec.decode_evgen(
6484                         v,
6485                         sub_offset,
6486                         leavemm=True,
6487                         decode_path=sub_decode_path,
6488                         ctx=ctx,
6489                         _ctx_immutable=False,
6490                         _evgen_mode=False,
6491                     ))
6492             except TagMismatch as err:
6493                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6494                     continue
6495                 raise
6496
6497             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6498             if not evgen_mode and defined is not None:
6499                 defined_by, defined_spec = defined
6500                 if issubclass(value.__class__, SequenceOf):
6501                     for i, _value in enumerate(value):
6502                         sub_sub_decode_path = sub_decode_path + (
6503                             str(i),
6504                             DecodePathDefBy(defined_by),
6505                         )
6506                         defined_value, defined_tail = defined_spec.decode(
6507                             memoryview(bytes(_value)),
6508                             sub_offset + (
6509                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6510                                 if value.expled else (value.tlen + value.llen)
6511                             ),
6512                             leavemm=True,
6513                             decode_path=sub_sub_decode_path,
6514                             ctx=ctx,
6515                             _ctx_immutable=False,
6516                         )
6517                         if len(defined_tail) > 0:
6518                             raise DecodeError(
6519                                 "remaining data",
6520                                 klass=self.__class__,
6521                                 decode_path=sub_sub_decode_path,
6522                                 offset=offset,
6523                             )
6524                         _value.defined = (defined_by, defined_value)
6525                 else:
6526                     defined_value, defined_tail = defined_spec.decode(
6527                         memoryview(bytes(value)),
6528                         sub_offset + (
6529                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6530                             if value.expled else (value.tlen + value.llen)
6531                         ),
6532                         leavemm=True,
6533                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6534                         ctx=ctx,
6535                         _ctx_immutable=False,
6536                     )
6537                     if len(defined_tail) > 0:
6538                         raise DecodeError(
6539                             "remaining data",
6540                             klass=self.__class__,
6541                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6542                             offset=offset,
6543                         )
6544                     value.defined = (defined_by, defined_value)
6545
6546             value_len = value.fulllen
6547             vlen += value_len
6548             sub_offset += value_len
6549             v = v_tail
6550             if spec_defaulted:
6551                 if evgen_mode:
6552                     yield sub_decode_path, value, v_tail
6553                 if value == spec.default:
6554                     if ctx_bered or ctx_allow_default_values:
6555                         ber_encoded = True
6556                     else:
6557                         raise DecodeError(
6558                             "DEFAULT value met",
6559                             klass=self.__class__,
6560                             decode_path=sub_decode_path,
6561                             offset=sub_offset,
6562                         )
6563             if not evgen_mode:
6564                 values[name] = value
6565                 spec_defines = getattr(spec, "defines", ())
6566                 if len(spec_defines) == 0:
6567                     defines_by_path = ctx.get("defines_by_path", ())
6568                     if len(defines_by_path) > 0:
6569                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6570                 if spec_defines is not None and len(spec_defines) > 0:
6571                     for rel_path, schema in spec_defines:
6572                         defined = schema.get(value, None)
6573                         if defined is not None:
6574                             ctx.setdefault("_defines", []).append((
6575                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6576                                 (value, defined),
6577                             ))
6578         if lenindef:
6579             if v[:EOC_LEN].tobytes() != EOC:
6580                 raise DecodeError(
6581                     "no EOC",
6582                     klass=self.__class__,
6583                     decode_path=decode_path,
6584                     offset=offset,
6585                 )
6586             tail = v[EOC_LEN:]
6587             vlen += EOC_LEN
6588         elif len(v) > 0:
6589             raise DecodeError(
6590                 "remaining data",
6591                 klass=self.__class__,
6592                 decode_path=decode_path,
6593                 offset=offset,
6594             )
6595         obj = self.__class__(
6596             schema=self.specs,
6597             impl=self.tag,
6598             expl=self._expl,
6599             default=self.default,
6600             optional=self.optional,
6601             _decoded=(offset, llen, vlen),
6602         )
6603         obj._value = values
6604         obj.lenindef = lenindef
6605         obj.ber_encoded = ber_encoded
6606         yield decode_path, obj, tail
6607
6608     def __repr__(self):
6609         value = pp_console_row(next(self.pps()))
6610         cols = []
6611         for name in self.specs:
6612             _value = self._value.get(name)
6613             if _value is None:
6614                 continue
6615             cols.append("%s: %s" % (name, repr(_value)))
6616         return "%s[%s]" % (value, "; ".join(cols))
6617
6618     def pps(self, decode_path=()):
6619         yield _pp(
6620             obj=self,
6621             asn1_type_name=self.asn1_type_name,
6622             obj_name=self.__class__.__name__,
6623             decode_path=decode_path,
6624             optional=self.optional,
6625             default=self == self.default,
6626             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6627             expl=None if self._expl is None else tag_decode(self._expl),
6628             offset=self.offset,
6629             tlen=self.tlen,
6630             llen=self.llen,
6631             vlen=self.vlen,
6632             expl_offset=self.expl_offset if self.expled else None,
6633             expl_tlen=self.expl_tlen if self.expled else None,
6634             expl_llen=self.expl_llen if self.expled else None,
6635             expl_vlen=self.expl_vlen if self.expled else None,
6636             expl_lenindef=self.expl_lenindef,
6637             lenindef=self.lenindef,
6638             ber_encoded=self.ber_encoded,
6639             bered=self.bered,
6640         )
6641         for name in self.specs:
6642             value = self._value.get(name)
6643             if value is None:
6644                 continue
6645             yield value.pps(decode_path=decode_path + (name,))
6646         for pp in self.pps_lenindef(decode_path):
6647             yield pp
6648
6649
6650 class Set(Sequence, SequenceEncode1stMixing):
6651     """``SET`` structure type
6652
6653     Its usage is identical to :py:class:`pyderasn.Sequence`.
6654
6655     .. _allow_unordered_set_ctx:
6656
6657     DER prohibits unordered values encoding and will raise an error
6658     during decode. If :ref:`bered <bered_ctx>` context option is set,
6659     then no error will occur. Also you can disable strict values
6660     ordering check by setting ``"allow_unordered_set": True``
6661     :ref:`context <ctx>` option.
6662     """
6663     __slots__ = ()
6664     tag_default = tag_encode(form=TagFormConstructed, num=17)
6665     asn1_type_name = "SET"
6666
6667     def _values_for_encoding(self):
6668         return sorted(
6669             super(Set, self)._values_for_encoding(),
6670             key=attrgetter("tag_order"),
6671         )
6672
6673     def _encode_cer(self, writer):
6674         write_full(writer, self.tag + LENINDEF)
6675         for v in sorted(
6676                 super(Set, self)._values_for_encoding(),
6677                 key=attrgetter("tag_order_cer"),
6678         ):
6679             v.encode_cer(writer)
6680         write_full(writer, EOC)
6681
6682     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6683         try:
6684             t, tlen, lv = tag_strip(tlv)
6685         except DecodeError as err:
6686             raise err.__class__(
6687                 msg=err.msg,
6688                 klass=self.__class__,
6689                 decode_path=decode_path,
6690                 offset=offset,
6691             )
6692         if t != self.tag:
6693             raise TagMismatch(
6694                 klass=self.__class__,
6695                 decode_path=decode_path,
6696                 offset=offset,
6697             )
6698         if tag_only:
6699             yield None
6700             return
6701         lenindef = False
6702         ctx_bered = ctx.get("bered", False)
6703         try:
6704             l, llen, v = len_decode(lv)
6705         except LenIndefForm as err:
6706             if not ctx_bered:
6707                 raise err.__class__(
6708                     msg=err.msg,
6709                     klass=self.__class__,
6710                     decode_path=decode_path,
6711                     offset=offset,
6712                 )
6713             l, llen, v = 0, 1, lv[1:]
6714             lenindef = True
6715         except DecodeError as err:
6716             raise err.__class__(
6717                 msg=err.msg,
6718                 klass=self.__class__,
6719                 decode_path=decode_path,
6720                 offset=offset,
6721             )
6722         if l > len(v):
6723             raise NotEnoughData(
6724                 "encoded length is longer than data",
6725                 klass=self.__class__,
6726                 offset=offset,
6727             )
6728         if not lenindef:
6729             v, tail = v[:l], v[l:]
6730         vlen = 0
6731         sub_offset = offset + tlen + llen
6732         values = {}
6733         ber_encoded = False
6734         ctx_allow_default_values = ctx.get("allow_default_values", False)
6735         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6736         tag_order_prev = (0, 0)
6737         _specs_items = copy(self.specs)
6738
6739         while len(v) > 0:
6740             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6741                 break
6742             for name, spec in iteritems(_specs_items):
6743                 sub_decode_path = decode_path + (name,)
6744                 try:
6745                     spec.decode(
6746                         v,
6747                         sub_offset,
6748                         leavemm=True,
6749                         decode_path=sub_decode_path,
6750                         ctx=ctx,
6751                         tag_only=True,
6752                         _ctx_immutable=False,
6753                     )
6754                 except TagMismatch:
6755                     continue
6756                 break
6757             else:
6758                 raise TagMismatch(
6759                     klass=self.__class__,
6760                     decode_path=decode_path,
6761                     offset=offset,
6762                 )
6763             spec_defaulted = spec.default is not None
6764             if evgen_mode and not spec_defaulted:
6765                 for _decode_path, value, v_tail in spec.decode_evgen(
6766                         v,
6767                         sub_offset,
6768                         leavemm=True,
6769                         decode_path=sub_decode_path,
6770                         ctx=ctx,
6771                         _ctx_immutable=False,
6772                 ):
6773                     yield _decode_path, value, v_tail
6774             else:
6775                 _, value, v_tail = next(spec.decode_evgen(
6776                     v,
6777                     sub_offset,
6778                     leavemm=True,
6779                     decode_path=sub_decode_path,
6780                     ctx=ctx,
6781                     _ctx_immutable=False,
6782                     _evgen_mode=False,
6783                 ))
6784             value_tag_order = value.tag_order
6785             value_len = value.fulllen
6786             if tag_order_prev >= value_tag_order:
6787                 if ctx_bered or ctx_allow_unordered_set:
6788                     ber_encoded = True
6789                 else:
6790                     raise DecodeError(
6791                         "unordered " + self.asn1_type_name,
6792                         klass=self.__class__,
6793                         decode_path=sub_decode_path,
6794                         offset=sub_offset,
6795                     )
6796             if spec_defaulted:
6797                 if evgen_mode:
6798                     yield sub_decode_path, value, v_tail
6799                 if value != spec.default:
6800                     pass
6801                 elif ctx_bered or ctx_allow_default_values:
6802                     ber_encoded = True
6803                 else:
6804                     raise DecodeError(
6805                         "DEFAULT value met",
6806                         klass=self.__class__,
6807                         decode_path=sub_decode_path,
6808                         offset=sub_offset,
6809                     )
6810             values[name] = value
6811             del _specs_items[name]
6812             tag_order_prev = value_tag_order
6813             sub_offset += value_len
6814             vlen += value_len
6815             v = v_tail
6816
6817         obj = self.__class__(
6818             schema=self.specs,
6819             impl=self.tag,
6820             expl=self._expl,
6821             default=self.default,
6822             optional=self.optional,
6823             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6824         )
6825         if lenindef:
6826             if v[:EOC_LEN].tobytes() != EOC:
6827                 raise DecodeError(
6828                     "no EOC",
6829                     klass=self.__class__,
6830                     decode_path=decode_path,
6831                     offset=offset,
6832                 )
6833             tail = v[EOC_LEN:]
6834             obj.lenindef = True
6835         for name, spec in iteritems(self.specs):
6836             if name not in values and not spec.optional:
6837                 raise DecodeError(
6838                     "%s value is not ready" % name,
6839                     klass=self.__class__,
6840                     decode_path=decode_path,
6841                     offset=offset,
6842                 )
6843         if not evgen_mode:
6844             obj._value = values
6845         obj.ber_encoded = ber_encoded
6846         yield decode_path, obj, tail
6847
6848
6849 SequenceOfState = namedtuple(
6850     "SequenceOfState",
6851     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6852     **NAMEDTUPLE_KWARGS
6853 )
6854
6855
6856 class SequenceOf(SequenceEncode1stMixing, Obj):
6857     """``SEQUENCE OF`` sequence type
6858
6859     For that kind of type you must specify the object it will carry on
6860     (bounds are for example here, not required)::
6861
6862         class Ints(SequenceOf):
6863             schema = Integer()
6864             bounds = (0, 2)
6865
6866     >>> ints = Ints()
6867     >>> ints.append(Integer(123))
6868     >>> ints.append(Integer(234))
6869     >>> ints
6870     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6871     >>> [int(i) for i in ints]
6872     [123, 234]
6873     >>> ints.append(Integer(345))
6874     Traceback (most recent call last):
6875     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6876     >>> ints[1]
6877     INTEGER 234
6878     >>> ints[1] = Integer(345)
6879     >>> ints
6880     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6881
6882     You can initialize sequence with preinitialized values:
6883
6884     >>> ints = Ints([Integer(123), Integer(234)])
6885
6886     Also you can use iterator as a value:
6887
6888     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6889
6890     And it won't be iterated until encoding process. Pay attention that
6891     bounds and required schema checks are done only during the encoding
6892     process in that case! After encode was called, then value is zeroed
6893     back to empty list and you have to set it again. That mode is useful
6894     mainly with CER encoding mode, where all objects from the iterable
6895     will be streamed to the buffer, without copying all of them to
6896     memory first.
6897     """
6898     __slots__ = ("spec", "_bound_min", "_bound_max")
6899     tag_default = tag_encode(form=TagFormConstructed, num=16)
6900     asn1_type_name = "SEQUENCE OF"
6901
6902     def __init__(
6903             self,
6904             value=None,
6905             schema=None,
6906             bounds=None,
6907             impl=None,
6908             expl=None,
6909             default=None,
6910             optional=False,
6911             _decoded=(0, 0, 0),
6912     ):
6913         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6914         if schema is None:
6915             schema = getattr(self, "schema", None)
6916         if schema is None:
6917             raise ValueError("schema must be specified")
6918         self.spec = schema
6919         self._bound_min, self._bound_max = getattr(
6920             self,
6921             "bounds",
6922             (0, float("+inf")),
6923         ) if bounds is None else bounds
6924         self._value = []
6925         if value is not None:
6926             self._value = self._value_sanitize(value)
6927         if default is not None:
6928             default_value = self._value_sanitize(default)
6929             default_obj = self.__class__(
6930                 schema=schema,
6931                 impl=self.tag,
6932                 expl=self._expl,
6933             )
6934             default_obj._value = default_value
6935             self.default = default_obj
6936             if value is None:
6937                 self._value = copy(default_obj._value)
6938
6939     def _value_sanitize(self, value):
6940         iterator = False
6941         if issubclass(value.__class__, SequenceOf):
6942             value = value._value
6943         elif hasattr(value, NEXT_ATTR_NAME):
6944             iterator = True
6945         elif hasattr(value, "__iter__"):
6946             value = list(value)
6947         else:
6948             raise InvalidValueType((self.__class__, iter, "iterator"))
6949         if not iterator:
6950             if not self._bound_min <= len(value) <= self._bound_max:
6951                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6952             class_expected = self.spec.__class__
6953             for v in value:
6954                 if not isinstance(v, class_expected):
6955                     raise InvalidValueType((class_expected,))
6956         return value
6957
6958     @property
6959     def ready(self):
6960         if hasattr(self._value, NEXT_ATTR_NAME):
6961             return True
6962         if self._bound_min > 0 and len(self._value) == 0:
6963             return False
6964         return all(v.ready for v in self._value)
6965
6966     @property
6967     def bered(self):
6968         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6969             return True
6970         return any(v.bered for v in self._value)
6971
6972     def __getstate__(self):
6973         if hasattr(self._value, NEXT_ATTR_NAME):
6974             raise ValueError("can not pickle SequenceOf with iterator")
6975         return SequenceOfState(
6976             __version__,
6977             self.tag,
6978             self._tag_order,
6979             self._expl,
6980             self.default,
6981             self.optional,
6982             self.offset,
6983             self.llen,
6984             self.vlen,
6985             self.expl_lenindef,
6986             self.lenindef,
6987             self.ber_encoded,
6988             self.spec,
6989             [copy(v) for v in self._value],
6990             self._bound_min,
6991             self._bound_max,
6992         )
6993
6994     def __setstate__(self, state):
6995         super(SequenceOf, self).__setstate__(state)
6996         self.spec = state.spec
6997         self._value = state.value
6998         self._bound_min = state.bound_min
6999         self._bound_max = state.bound_max
7000
7001     def __eq__(self, their):
7002         if isinstance(their, self.__class__):
7003             return (
7004                 self.spec == their.spec and
7005                 self.tag == their.tag and
7006                 self._expl == their._expl and
7007                 self._value == their._value
7008             )
7009         if hasattr(their, "__iter__"):
7010             return self._value == list(their)
7011         return False
7012
7013     def __call__(
7014             self,
7015             value=None,
7016             bounds=None,
7017             impl=None,
7018             expl=None,
7019             default=None,
7020             optional=None,
7021     ):
7022         return self.__class__(
7023             value=value,
7024             schema=self.spec,
7025             bounds=(
7026                 (self._bound_min, self._bound_max)
7027                 if bounds is None else bounds
7028             ),
7029             impl=self.tag if impl is None else impl,
7030             expl=self._expl if expl is None else expl,
7031             default=self.default if default is None else default,
7032             optional=self.optional if optional is None else optional,
7033         )
7034
7035     def __contains__(self, key):
7036         return key in self._value
7037
7038     def append(self, value):
7039         if not isinstance(value, self.spec.__class__):
7040             raise InvalidValueType((self.spec.__class__,))
7041         if len(self._value) + 1 > self._bound_max:
7042             raise BoundsError(
7043                 self._bound_min,
7044                 len(self._value) + 1,
7045                 self._bound_max,
7046             )
7047         self._value.append(value)
7048
7049     def __iter__(self):
7050         return iter(self._value)
7051
7052     def __len__(self):
7053         return len(self._value)
7054
7055     def __setitem__(self, key, value):
7056         if not isinstance(value, self.spec.__class__):
7057             raise InvalidValueType((self.spec.__class__,))
7058         self._value[key] = self.spec(value=value)
7059
7060     def __getitem__(self, key):
7061         return self._value[key]
7062
7063     def _values_for_encoding(self):
7064         return iter(self._value)
7065
7066     def _encode(self):
7067         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7068         if iterator:
7069             values = []
7070             values_append = values.append
7071             class_expected = self.spec.__class__
7072             values_for_encoding = self._values_for_encoding()
7073             self._value = []
7074             for v in values_for_encoding:
7075                 if not isinstance(v, class_expected):
7076                     raise InvalidValueType((class_expected,))
7077                 values_append(v.encode())
7078             if not self._bound_min <= len(values) <= self._bound_max:
7079                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7080             value = b"".join(values)
7081         else:
7082             value = b"".join(v.encode() for v in self._values_for_encoding())
7083         return b"".join((self.tag, len_encode(len(value)), value))
7084
7085     def _encode1st(self, state):
7086         state = super(SequenceOf, self)._encode1st(state)
7087         if hasattr(self._value, NEXT_ATTR_NAME):
7088             self._value = []
7089         return state
7090
7091     def _encode2nd(self, writer, state_iter):
7092         write_full(writer, self.tag + len_encode(next(state_iter)))
7093         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7094         if iterator:
7095             values_count = 0
7096             class_expected = self.spec.__class__
7097             values_for_encoding = self._values_for_encoding()
7098             self._value = []
7099             for v in values_for_encoding:
7100                 if not isinstance(v, class_expected):
7101                     raise InvalidValueType((class_expected,))
7102                 v.encode2nd(writer, state_iter)
7103                 values_count += 1
7104             if not self._bound_min <= values_count <= self._bound_max:
7105                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7106         else:
7107             for v in self._values_for_encoding():
7108                 v.encode2nd(writer, state_iter)
7109
7110     def _encode_cer(self, writer):
7111         write_full(writer, self.tag + LENINDEF)
7112         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7113         if iterator:
7114             class_expected = self.spec.__class__
7115             values_count = 0
7116             values_for_encoding = self._values_for_encoding()
7117             self._value = []
7118             for v in values_for_encoding:
7119                 if not isinstance(v, class_expected):
7120                     raise InvalidValueType((class_expected,))
7121                 v.encode_cer(writer)
7122                 values_count += 1
7123             if not self._bound_min <= values_count <= self._bound_max:
7124                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7125         else:
7126             for v in self._values_for_encoding():
7127                 v.encode_cer(writer)
7128         write_full(writer, EOC)
7129
7130     def _decode(
7131             self,
7132             tlv,
7133             offset,
7134             decode_path,
7135             ctx,
7136             tag_only,
7137             evgen_mode,
7138             ordering_check=False,
7139     ):
7140         try:
7141             t, tlen, lv = tag_strip(tlv)
7142         except DecodeError as err:
7143             raise err.__class__(
7144                 msg=err.msg,
7145                 klass=self.__class__,
7146                 decode_path=decode_path,
7147                 offset=offset,
7148             )
7149         if t != self.tag:
7150             raise TagMismatch(
7151                 klass=self.__class__,
7152                 decode_path=decode_path,
7153                 offset=offset,
7154             )
7155         if tag_only:
7156             yield None
7157             return
7158         lenindef = False
7159         ctx_bered = ctx.get("bered", False)
7160         try:
7161             l, llen, v = len_decode(lv)
7162         except LenIndefForm as err:
7163             if not ctx_bered:
7164                 raise err.__class__(
7165                     msg=err.msg,
7166                     klass=self.__class__,
7167                     decode_path=decode_path,
7168                     offset=offset,
7169                 )
7170             l, llen, v = 0, 1, lv[1:]
7171             lenindef = True
7172         except DecodeError as err:
7173             raise err.__class__(
7174                 msg=err.msg,
7175                 klass=self.__class__,
7176                 decode_path=decode_path,
7177                 offset=offset,
7178             )
7179         if l > len(v):
7180             raise NotEnoughData(
7181                 "encoded length is longer than data",
7182                 klass=self.__class__,
7183                 decode_path=decode_path,
7184                 offset=offset,
7185             )
7186         if not lenindef:
7187             v, tail = v[:l], v[l:]
7188         vlen = 0
7189         sub_offset = offset + tlen + llen
7190         _value = []
7191         _value_count = 0
7192         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7193         value_prev = memoryview(v[:0])
7194         ber_encoded = False
7195         spec = self.spec
7196         while len(v) > 0:
7197             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7198                 break
7199             sub_decode_path = decode_path + (str(_value_count),)
7200             if evgen_mode:
7201                 for _decode_path, value, v_tail in spec.decode_evgen(
7202                         v,
7203                         sub_offset,
7204                         leavemm=True,
7205                         decode_path=sub_decode_path,
7206                         ctx=ctx,
7207                         _ctx_immutable=False,
7208                 ):
7209                     yield _decode_path, value, v_tail
7210             else:
7211                 _, value, v_tail = next(spec.decode_evgen(
7212                     v,
7213                     sub_offset,
7214                     leavemm=True,
7215                     decode_path=sub_decode_path,
7216                     ctx=ctx,
7217                     _ctx_immutable=False,
7218                     _evgen_mode=False,
7219                 ))
7220             value_len = value.fulllen
7221             if ordering_check:
7222                 if value_prev.tobytes() > v[:value_len].tobytes():
7223                     if ctx_bered or ctx_allow_unordered_set:
7224                         ber_encoded = True
7225                     else:
7226                         raise DecodeError(
7227                             "unordered " + self.asn1_type_name,
7228                             klass=self.__class__,
7229                             decode_path=sub_decode_path,
7230                             offset=sub_offset,
7231                         )
7232                 value_prev = v[:value_len]
7233             _value_count += 1
7234             if not evgen_mode:
7235                 _value.append(value)
7236             sub_offset += value_len
7237             vlen += value_len
7238             v = v_tail
7239         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7240             raise DecodeError(
7241                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7242                 klass=self.__class__,
7243                 decode_path=decode_path,
7244                 offset=offset,
7245             )
7246         try:
7247             obj = self.__class__(
7248                 value=None if evgen_mode else _value,
7249                 schema=spec,
7250                 bounds=(self._bound_min, self._bound_max),
7251                 impl=self.tag,
7252                 expl=self._expl,
7253                 default=self.default,
7254                 optional=self.optional,
7255                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7256             )
7257         except BoundsError as err:
7258             raise DecodeError(
7259                 msg=str(err),
7260                 klass=self.__class__,
7261                 decode_path=decode_path,
7262                 offset=offset,
7263             )
7264         if lenindef:
7265             if v[:EOC_LEN].tobytes() != EOC:
7266                 raise DecodeError(
7267                     "no EOC",
7268                     klass=self.__class__,
7269                     decode_path=decode_path,
7270                     offset=offset,
7271                 )
7272             obj.lenindef = True
7273             tail = v[EOC_LEN:]
7274         obj.ber_encoded = ber_encoded
7275         yield decode_path, obj, tail
7276
7277     def __repr__(self):
7278         return "%s[%s]" % (
7279             pp_console_row(next(self.pps())),
7280             ", ".join(repr(v) for v in self._value),
7281         )
7282
7283     def pps(self, decode_path=()):
7284         yield _pp(
7285             obj=self,
7286             asn1_type_name=self.asn1_type_name,
7287             obj_name=self.__class__.__name__,
7288             decode_path=decode_path,
7289             optional=self.optional,
7290             default=self == self.default,
7291             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7292             expl=None if self._expl is None else tag_decode(self._expl),
7293             offset=self.offset,
7294             tlen=self.tlen,
7295             llen=self.llen,
7296             vlen=self.vlen,
7297             expl_offset=self.expl_offset if self.expled else None,
7298             expl_tlen=self.expl_tlen if self.expled else None,
7299             expl_llen=self.expl_llen if self.expled else None,
7300             expl_vlen=self.expl_vlen if self.expled else None,
7301             expl_lenindef=self.expl_lenindef,
7302             lenindef=self.lenindef,
7303             ber_encoded=self.ber_encoded,
7304             bered=self.bered,
7305         )
7306         for i, value in enumerate(self._value):
7307             yield value.pps(decode_path=decode_path + (str(i),))
7308         for pp in self.pps_lenindef(decode_path):
7309             yield pp
7310
7311
7312 class SetOf(SequenceOf):
7313     """``SET OF`` sequence type
7314
7315     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7316     """
7317     __slots__ = ()
7318     tag_default = tag_encode(form=TagFormConstructed, num=17)
7319     asn1_type_name = "SET OF"
7320
7321     def _value_sanitize(self, value):
7322         value = super(SetOf, self)._value_sanitize(value)
7323         if hasattr(value, NEXT_ATTR_NAME):
7324             raise ValueError(
7325                 "SetOf does not support iterator values, as no sense in them"
7326             )
7327         return value
7328
7329     def _encode(self):
7330         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7331         return b"".join((self.tag, len_encode(len(v)), v))
7332
7333     def _encode2nd(self, writer, state_iter):
7334         write_full(writer, self.tag + len_encode(next(state_iter)))
7335         values = []
7336         for v in self._values_for_encoding():
7337             buf = BytesIO()
7338             v.encode2nd(buf.write, state_iter)
7339             values.append(buf.getvalue())
7340         values.sort()
7341         for v in values:
7342             write_full(writer, v)
7343
7344     def _encode_cer(self, writer):
7345         write_full(writer, self.tag + LENINDEF)
7346         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7347             write_full(writer, v)
7348         write_full(writer, EOC)
7349
7350     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7351         return super(SetOf, self)._decode(
7352             tlv,
7353             offset,
7354             decode_path,
7355             ctx,
7356             tag_only,
7357             evgen_mode,
7358             ordering_check=True,
7359         )
7360
7361
7362 def obj_by_path(pypath):  # pragma: no cover
7363     """Import object specified as string Python path
7364
7365     Modules must be separated from classes/functions with ``:``.
7366
7367     >>> obj_by_path("foo.bar:Baz")
7368     <class 'foo.bar.Baz'>
7369     >>> obj_by_path("foo.bar:Baz.boo")
7370     <classmethod 'foo.bar.Baz.boo'>
7371     """
7372     mod, objs = pypath.rsplit(":", 1)
7373     from importlib import import_module
7374     obj = import_module(mod)
7375     for obj_name in objs.split("."):
7376         obj = getattr(obj, obj_name)
7377     return obj
7378
7379
7380 def generic_decoder():  # pragma: no cover
7381     # All of this below is a big hack with self references
7382     choice = PrimitiveTypes()
7383     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7384     choice.specs["SetOf"] = SetOf(schema=choice)
7385     for i in six_xrange(31):
7386         choice.specs["SequenceOf%d" % i] = SequenceOf(
7387             schema=choice,
7388             expl=tag_ctxc(i),
7389         )
7390     choice.specs["Any"] = Any()
7391
7392     # Class name equals to type name, to omit it from output
7393     class SEQUENCEOF(SequenceOf):
7394         __slots__ = ()
7395         schema = choice
7396
7397     def pprint_any(
7398             obj,
7399             oid_maps=(),
7400             with_colours=False,
7401             with_decode_path=False,
7402             decode_path_only=(),
7403             decode_path=(),
7404     ):
7405         def _pprint_pps(pps):
7406             for pp in pps:
7407                 if hasattr(pp, "_fields"):
7408                     if (
7409                             decode_path_only != () and
7410                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7411                     ):
7412                         continue
7413                     if pp.asn1_type_name == Choice.asn1_type_name:
7414                         continue
7415                     pp_kwargs = pp._asdict()
7416                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7417                     pp = _pp(**pp_kwargs)
7418                     yield pp_console_row(
7419                         pp,
7420                         oid_maps=oid_maps,
7421                         with_offsets=True,
7422                         with_blob=False,
7423                         with_colours=with_colours,
7424                         with_decode_path=with_decode_path,
7425                         decode_path_len_decrease=len(decode_path_only),
7426                     )
7427                     for row in pp_console_blob(
7428                             pp,
7429                             decode_path_len_decrease=len(decode_path_only),
7430                     ):
7431                         yield row
7432                 else:
7433                     for row in _pprint_pps(pp):
7434                         yield row
7435         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7436     return SEQUENCEOF(), pprint_any
7437
7438
7439 def ascii_visualize(ba):
7440     """Output only ASCII printable characters, like in hexdump -C
7441
7442     Example output for given binary string (right part)::
7443
7444         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7445                                                            ^^^^^^^^^^^^^^^^
7446     """
7447     return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7448
7449
7450 def hexdump(raw):
7451     """Generate ``hexdump -C`` like output
7452
7453     Rendered example::
7454
7455         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7456         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7457
7458     Result of that function is a generator of lines, where each line is
7459     a list of columns::
7460
7461         [
7462             [...],
7463             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7464                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7465                           " |i..?.....G...k.j|"]
7466             [...],
7467         ]
7468     """
7469     hexed = hexenc(raw).upper()
7470     addr, cols = 0, ["%08x " % 0]
7471     for i in six_xrange(0, len(hexed), 2):
7472         if i != 0 and i // 2 % 8 == 0:
7473             cols[-1] += " "
7474         if i != 0 and i // 2 % 16 == 0:
7475             cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7476             yield cols
7477             addr += 16
7478             cols = ["%08x " % addr]
7479         cols.append(" " + hexed[i:i + 2])
7480     if len(cols) > 0:
7481         cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7482         yield cols
7483
7484
7485 def browse(raw, obj, oid_maps=()):
7486     """Interactive browser
7487
7488     :param bytes raw: binary data you decoded
7489     :param obj: decoded :py:class:`pyderasn.Obj`
7490     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7491                      Its human readable form is printed when OID is met
7492
7493     .. note:: `urwid <http://urwid.org/>`__ dependency required
7494
7495     This browser is an interactive terminal application for browsing
7496     structures of your decoded ASN.1 objects. You can quit it with **q**
7497     key. It consists of three windows:
7498
7499     :tree:
7500      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7501      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7502      **Left** key goes to constructed element above. **Plus**/**Minus**
7503      keys collapse/uncollapse constructed elements. **Space** toggles it
7504     :info:
7505      window with various information about element. You can scroll it
7506      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7507     :hexdump:
7508      window with raw data hexdump and highlighted current element's
7509      contents. It automatically focuses on element's data. You can
7510      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7511      speed) keys. If element has explicit tag, then it also will be
7512      highlighted with different colour
7513
7514     Window's header contains current decode path and progress bars with
7515     position in *info* and *hexdump* windows.
7516
7517     If you press **d**, then current element will be saved in the
7518     current directory under its decode path name (adding ".0", ".1", etc
7519     suffix if such file already exists). **D** will save it with explicit tag.
7520
7521     You can also invoke it with ``--browse`` command line argument.
7522     """
7523     from copy import deepcopy
7524     from os.path import exists as path_exists
7525     import urwid
7526
7527     class TW(urwid.TreeWidget):
7528         def __init__(self, state, *args, **kwargs):
7529             self.state = state
7530             self.scrolled = {"info": False, "hexdump": False}
7531             super(TW, self).__init__(*args, **kwargs)
7532
7533         def _get_pp(self):
7534             pp = self.get_node().get_value()
7535             constructed = len(pp) > 1
7536             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7537
7538         def _state_update(self):
7539             pp, _ = self._get_pp()
7540             self.state["decode_path"].set_text(
7541                 ":".join(str(p) for p in pp.decode_path)
7542             )
7543             lines = deepcopy(self.state["hexed"])
7544
7545             def attr_set(i, attr):
7546                 line = lines[i // 16]
7547                 idx = 1 + (i - 16 * (i // 16))
7548                 line[idx] = (attr, line[idx])
7549
7550             if pp.expl_offset is not None:
7551                 for i in six_xrange(
7552                         pp.expl_offset,
7553                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7554                 ):
7555                     attr_set(i, "select-expl")
7556             for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7557                 attr_set(i, "select-value")
7558             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7559             self.state["hexdump"].set_focus(pp.offset // 16)
7560             self.state["hexdump"].set_focus_valign("middle")
7561             self.state["hexdump_bar"].set_completion(
7562                 (100 * pp.offset // 16) //
7563                 len(self.state["hexdump"]._body.positions())
7564             )
7565
7566             lines = [
7567                 [("header", "Name: "), pp.obj_name],
7568                 [("header", "Type: "), pp.asn1_type_name],
7569                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7570                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7571                     pp.tlen, pp.llen, pp.vlen,
7572                 )],
7573                 [("header", "TLVlen: "), "%d" % sum((
7574                     pp.tlen, pp.llen, pp.vlen,
7575                 ))],
7576                 [("header", "Slice: "), "[%d:%d]" % (
7577                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7578                 )],
7579             ]
7580             if pp.lenindef:
7581                 lines.append([("warning", "LENINDEF")])
7582             if pp.ber_encoded:
7583                 lines.append([("warning", "BER encoded")])
7584             if pp.bered:
7585                 lines.append([("warning", "BERed")])
7586             if pp.expl is not None:
7587                 lines.append([("header", "EXPLICIT")])
7588                 klass, _, num = pp.expl
7589                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7590                 if pp.expl_offset is not None:
7591                     lines.append(["  Offset: %d" % pp.expl_offset])
7592                     lines.append(["  [TLV]len: %d/%d/%d" % (
7593                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7594                     )])
7595                     lines.append(["  TLVlen: %d" % sum((
7596                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7597                     ))])
7598                     lines.append(["  Slice: [%d:%d]" % (
7599                         pp.expl_offset,
7600                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7601                     )])
7602             if pp.impl is not None:
7603                 klass, _, num = pp.impl
7604                 lines.append([
7605                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7606                 ])
7607             if pp.optional:
7608                 lines.append(["OPTIONAL"])
7609             if pp.default:
7610                 lines.append(["DEFAULT"])
7611             if len(pp.decode_path) > 0:
7612                 ent = pp.decode_path[-1]
7613                 if isinstance(ent, DecodePathDefBy):
7614                     lines.append([""])
7615                     value = str(ent.defined_by)
7616                     oid_name = find_oid_name(
7617                         ent.defined_by.asn1_type_name, oid_maps, value,
7618                     )
7619                     lines.append([("header", "DEFINED BY: "), "%s" % (
7620                         value if oid_name is None
7621                         else "%s (%s)" % (oid_name, value)
7622                     )])
7623             lines.append([""])
7624             if pp.value is not None:
7625                 lines.append([("header", "Value: "), pp.value])
7626                 if (
7627                         len(oid_maps) > 0 and
7628                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7629                 ):
7630                     for oid_map in oid_maps:
7631                         oid_name = oid_map.get(pp.value)
7632                         if oid_name is not None:
7633                             lines.append([("header", "Human: "), oid_name])
7634                             break
7635                 if pp.asn1_type_name == Integer.asn1_type_name:
7636                     lines.append([
7637                         ("header", "Decimal: "), "%d" % int(pp.obj),
7638                     ])
7639                     lines.append([
7640                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7641                     ])
7642             if pp.blob.__class__ == binary_type:
7643                 blob = hexenc(pp.blob).upper()
7644                 for i in six_xrange(0, len(blob), 32):
7645                     lines.append([colonize_hex(blob[i:i + 32])])
7646             elif pp.blob.__class__ == tuple:
7647                 lines.append([", ".join(pp.blob)])
7648             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7649             self.state["info_bar"].set_completion(0)
7650
7651         def selectable(self):
7652             if self.state["widget_current"] != self:
7653                 self.state["widget_current"] = self
7654                 self.scrolled["info"] = False
7655                 self.scrolled["hexdump"] = False
7656                 self._state_update()
7657             return super(TW, self).selectable()
7658
7659         def get_display_text(self):
7660             pp, constructed = self._get_pp()
7661             style = "constructed" if constructed else ""
7662             if len(pp.decode_path) == 0:
7663                 return (style, pp.obj_name)
7664             if pp.asn1_type_name == "EOC":
7665                 return ("eoc", "EOC")
7666             ent = pp.decode_path[-1]
7667             if isinstance(ent, DecodePathDefBy):
7668                 value = str(ent.defined_by)
7669                 oid_name = find_oid_name(
7670                     ent.defined_by.asn1_type_name, oid_maps, value,
7671                 )
7672                 return ("defby", "DEFBY:" + (
7673                     value if oid_name is None else oid_name
7674                 ))
7675             return (style, ent)
7676
7677         def _scroll(self, what, step):
7678             self.state[what]._invalidate()
7679             pos = self.state[what].focus_position
7680             if not self.scrolled[what]:
7681                 self.scrolled[what] = True
7682                 pos -= 2
7683             pos = max(0, pos + step)
7684             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7685             self.state[what].set_focus(pos)
7686             self.state[what].set_focus_valign("top")
7687             self.state[what + "_bar"].set_completion(
7688                 (100 * pos) // len(self.state[what]._body.positions())
7689             )
7690
7691         def keypress(self, size, key):
7692             if key == "q":
7693                 raise urwid.ExitMainLoop()
7694
7695             if key == " ":
7696                 self.expanded = not self.expanded
7697                 self.update_expanded_icon()
7698                 return None
7699
7700             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7701             if key in hexdump_steps:
7702                 self._scroll("hexdump", hexdump_steps[key])
7703                 return None
7704
7705             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7706             if key in info_steps:
7707                 self._scroll("info", info_steps[key])
7708                 return None
7709
7710             if key in ("d", "D"):
7711                 pp, _ = self._get_pp()
7712                 dp = ":".join(str(p) for p in pp.decode_path)
7713                 dp = dp.replace(" ", "_")
7714                 if dp == "":
7715                     dp = "root"
7716                 if key == "d" or pp.expl_offset is None:
7717                     data = self.state["raw"][pp.offset:(
7718                         pp.offset + pp.tlen + pp.llen + pp.vlen
7719                     )]
7720                 else:
7721                     data = self.state["raw"][pp.expl_offset:(
7722                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7723                     )]
7724                 ctr = 0
7725
7726                 def duplicate_path(dp, ctr):
7727                     if ctr == 0:
7728                         return dp
7729                     return "%s.%d" % (dp, ctr)
7730
7731                 while True:
7732                     if not path_exists(duplicate_path(dp, ctr)):
7733                         break
7734                     ctr += 1
7735                 dp = duplicate_path(dp, ctr)
7736                 with open(dp, "wb") as fd:
7737                     fd.write(data)
7738                 self.state["decode_path"].set_text(
7739                     ("warning", "Saved to: " + dp)
7740                 )
7741                 return None
7742             return super(TW, self).keypress(size, key)
7743
7744     class PN(urwid.ParentNode):
7745         def __init__(self, state, value, *args, **kwargs):
7746             self.state = state
7747             if not hasattr(value, "_fields"):
7748                 value = list(value)
7749             super(PN, self).__init__(value, *args, **kwargs)
7750
7751         def load_widget(self):
7752             return TW(self.state, self)
7753
7754         def load_child_keys(self):
7755             value = self.get_value()
7756             if hasattr(value, "_fields"):
7757                 return []
7758             return range(len(value[1:]))
7759
7760         def load_child_node(self, key):
7761             return PN(
7762                 self.state,
7763                 self.get_value()[key + 1],
7764                 parent=self,
7765                 key=key,
7766                 depth=self.get_depth() + 1,
7767             )
7768
7769     class LabeledPG(urwid.ProgressBar):
7770         def __init__(self, label, *args, **kwargs):
7771             self.label = label
7772             super(LabeledPG, self).__init__(*args, **kwargs)
7773
7774         def get_text(self):
7775             return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7776
7777     WinHexdump = urwid.ListBox([urwid.Text("")])
7778     WinInfo = urwid.ListBox([urwid.Text("")])
7779     WinDecodePath = urwid.Text("", "center")
7780     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7781     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7782     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7783         {
7784             "raw": raw,
7785             "hexed": list(hexdump(raw)),
7786             "widget_current": None,
7787             "info": WinInfo,
7788             "info_bar": WinInfoBar,
7789             "hexdump": WinHexdump,
7790             "hexdump_bar": WinHexdumpBar,
7791             "decode_path": WinDecodePath,
7792         },
7793         list(obj.pps()),
7794     )))
7795     help_text = " ".join((
7796         "q:quit",
7797         "space:(un)collapse",
7798         "(pg)up/down/home/end:nav",
7799         "jkJK:hexdump hlHL:info",
7800         "dD:dump",
7801     ))
7802     urwid.MainLoop(
7803         urwid.Frame(
7804             urwid.Columns([
7805                 ("weight", 1, WinTree),
7806                 ("weight", 2, urwid.Pile([
7807                     urwid.LineBox(WinInfo),
7808                     urwid.LineBox(WinHexdump),
7809                 ])),
7810             ]),
7811             header=urwid.Columns([
7812                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7813                 ("weight", 1, WinInfoBar),
7814                 ("weight", 1, WinHexdumpBar),
7815             ]),
7816             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7817         ),
7818         [
7819             ("header", "bold", ""),
7820             ("constructed", "bold", ""),
7821             ("help", "light magenta", ""),
7822             ("warning", "light red", ""),
7823             ("defby", "light red", ""),
7824             ("eoc", "dark red", ""),
7825             ("select-value", "light green", ""),
7826             ("select-expl", "light red", ""),
7827             ("pg-normal", "", "light blue"),
7828             ("pg-complete", "black", "yellow"),
7829         ],
7830     ).run()
7831
7832
7833 def main():  # pragma: no cover
7834     import argparse
7835     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7836     parser.add_argument(
7837         "--skip",
7838         type=int,
7839         default=0,
7840         help="Skip that number of bytes from the beginning",
7841     )
7842     parser.add_argument(
7843         "--oids",
7844         help="Python paths to dictionary with OIDs, comma separated",
7845     )
7846     parser.add_argument(
7847         "--schema",
7848         help="Python path to schema definition to use",
7849     )
7850     parser.add_argument(
7851         "--defines-by-path",
7852         help="Python path to decoder's defines_by_path",
7853     )
7854     parser.add_argument(
7855         "--nobered",
7856         action="store_true",
7857         help="Disallow BER encoding",
7858     )
7859     parser.add_argument(
7860         "--print-decode-path",
7861         action="store_true",
7862         help="Print decode paths",
7863     )
7864     parser.add_argument(
7865         "--decode-path-only",
7866         help="Print only specified decode path",
7867     )
7868     parser.add_argument(
7869         "--allow-expl-oob",
7870         action="store_true",
7871         help="Allow explicit tag out-of-bound",
7872     )
7873     parser.add_argument(
7874         "--evgen",
7875         action="store_true",
7876         help="Turn on event generation mode",
7877     )
7878     parser.add_argument(
7879         "--browse",
7880         action="store_true",
7881         help="Start ASN.1 browser",
7882     )
7883     parser.add_argument(
7884         "RAWFile",
7885         type=argparse.FileType("rb"),
7886         help="Path to BER/CER/DER file you want to decode",
7887     )
7888     args = parser.parse_args()
7889     if PY2:
7890         args.RAWFile.seek(args.skip)
7891         raw = memoryview(args.RAWFile.read())
7892         args.RAWFile.close()
7893     else:
7894         raw = file_mmaped(args.RAWFile)[args.skip:]
7895     oid_maps = (
7896         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7897         if args.oids else ()
7898     )
7899     from functools import partial
7900     if args.schema:
7901         schema = obj_by_path(args.schema)
7902         pprinter = partial(pprint, big_blobs=True)
7903     else:
7904         schema, pprinter = generic_decoder()
7905     ctx = {
7906         "bered": not args.nobered,
7907         "allow_expl_oob": args.allow_expl_oob,
7908     }
7909     if args.defines_by_path is not None:
7910         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7911     if args.browse:
7912         obj, _ = schema().decode(raw, ctx=ctx)
7913         browse(raw, obj, oid_maps)
7914         from sys import exit as sys_exit
7915         sys_exit(0)
7916     from os import environ
7917     pprinter = partial(
7918         pprinter,
7919         oid_maps=oid_maps,
7920         with_colours=environ.get("NO_COLOR") is None,
7921         with_decode_path=args.print_decode_path,
7922         decode_path_only=(
7923             () if args.decode_path_only is None else
7924             tuple(args.decode_path_only.split(":"))
7925         ),
7926     )
7927     if args.evgen:
7928         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7929             print(pprinter(obj, decode_path=decode_path))
7930     else:
7931         obj, tail = schema().decode(raw, ctx=ctx)
7932         print(pprinter(obj))
7933     if tail != b"":
7934         print("\nTrailing data: %s" % hexenc(tail))
7935
7936
7937 if __name__ == "__main__":
7938     from pyderasn import *
7939     main()