]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Check if tag's long form used in expected way
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 .. _browser:
905
906 ASN.1 browser
907 -------------
908 .. autofunction:: pyderasn.browse
909
910 Base Obj
911 --------
912 .. autoclass:: pyderasn.Obj
913    :members:
914
915 Primitive types
916 ---------------
917
918 Boolean
919 _______
920 .. autoclass:: pyderasn.Boolean
921    :members: __init__
922
923 Integer
924 _______
925 .. autoclass:: pyderasn.Integer
926    :members: __init__, named, tohex
927
928 BitString
929 _________
930 .. autoclass:: pyderasn.BitString
931    :members: __init__, bit_len, named
932
933 OctetString
934 ___________
935 .. autoclass:: pyderasn.OctetString
936    :members: __init__
937
938 Null
939 ____
940 .. autoclass:: pyderasn.Null
941    :members: __init__
942
943 ObjectIdentifier
944 ________________
945 .. autoclass:: pyderasn.ObjectIdentifier
946    :members: __init__
947
948 Enumerated
949 __________
950 .. autoclass:: pyderasn.Enumerated
951
952 CommonString
953 ____________
954 .. autoclass:: pyderasn.CommonString
955
956 NumericString
957 _____________
958 .. autoclass:: pyderasn.NumericString
959
960 PrintableString
961 _______________
962 .. autoclass:: pyderasn.PrintableString
963    :members: __init__, allow_asterisk, allow_ampersand
964
965 IA5String
966 _________
967 .. autoclass:: pyderasn.IA5String
968
969 VisibleString
970 _____________
971 .. autoclass:: pyderasn.VisibleString
972
973 UTCTime
974 _______
975 .. autoclass:: pyderasn.UTCTime
976    :members: __init__, todatetime
977
978 GeneralizedTime
979 _______________
980 .. autoclass:: pyderasn.GeneralizedTime
981    :members: __init__, todatetime
982
983 Special types
984 -------------
985
986 Choice
987 ______
988 .. autoclass:: pyderasn.Choice
989    :members: __init__, choice, value
990
991 PrimitiveTypes
992 ______________
993 .. autoclass:: PrimitiveTypes
994
995 Any
996 ___
997 .. autoclass:: pyderasn.Any
998    :members: __init__
999
1000 Constructed types
1001 -----------------
1002
1003 Sequence
1004 ________
1005 .. autoclass:: pyderasn.Sequence
1006    :members: __init__
1007
1008 Set
1009 ___
1010 .. autoclass:: pyderasn.Set
1011    :members: __init__
1012
1013 SequenceOf
1014 __________
1015 .. autoclass:: pyderasn.SequenceOf
1016    :members: __init__
1017
1018 SetOf
1019 _____
1020 .. autoclass:: pyderasn.SetOf
1021    :members: __init__
1022
1023 Various
1024 -------
1025
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1041    :members: __init__
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1052
1053 .. _cmdline:
1054
1055 Command-line usage
1056 ------------------
1057
1058 You can decode DER/BER files using command line abilities::
1059
1060     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1061
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1065
1066     $ python -m pyderasn path/to/file
1067         0   [1,3,1604]  . >: SEQUENCE OF
1068         4   [1,3,1453]  . . >: SEQUENCE OF
1069         8   [0,0,   5]  . . . . >: [0] ANY
1070                         . . . . . A0:03:02:01:02
1071        13   [1,1,   3]  . . . . >: INTEGER 61595
1072        18   [1,1,  13]  . . . . >: SEQUENCE OF
1073        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074        31   [1,1,   0]  . . . . . . >: NULL
1075        33   [1,3, 274]  . . . . >: SEQUENCE OF
1076        37   [1,1,  11]  . . . . . . >: SET OF
1077        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1078        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1080     [...]
1081      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1082      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1084                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086                         . . . . . . . . . 61:2E:63:6F:6D:2F
1087      1461   [1,1,  13]  . . >: SEQUENCE OF
1088      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089      1474   [1,1,   0]  . . . . >: NULL
1090      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1091                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1093     [...]
1094
1095 Human readable OIDs
1096 ___________________
1097
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1100
1101     stroid2name = {
1102         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1104         [...]
1105         "2.5.4.10": "id-at-organizationName",
1106         "2.5.4.11": "id-at-organizationalUnitName",
1107     }
1108
1109 then you can pass it to pretty printer to see human readable OIDs::
1110
1111     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1112     [...]
1113        37   [1,1,  11]  . . . . . . >: SET OF
1114        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1115        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1117        50   [1,1,  18]  . . . . . . >: SET OF
1118        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1119        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121        70   [1,1,  18]  . . . . . . >: SET OF
1122        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1123        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1125     [...]
1126
1127 Decode paths
1128 ____________
1129
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1134
1135     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136        0    [1,3,1604]  Certificate SEQUENCE []
1137        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1143                          . . . . 05:00
1144       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150                          . . . . . . . 13:02:45:53
1151       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1152     [...]
1153
1154 Now you can print only the specified tree, for example signature algorithm::
1155
1156     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1158       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1160                          . . 05:00
1161 """
1162
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from mmap import mmap
1174 from mmap import PROT_READ
1175 from operator import attrgetter
1176 from string import ascii_letters
1177 from string import digits
1178 from sys import maxsize as sys_maxsize
1179 from sys import version_info
1180 from unicodedata import category as unicat
1181
1182 from six import add_metaclass
1183 from six import binary_type
1184 from six import byte2int
1185 from six import indexbytes
1186 from six import int2byte
1187 from six import integer_types
1188 from six import iterbytes
1189 from six import iteritems
1190 from six import itervalues
1191 from six import PY2
1192 from six import string_types
1193 from six import text_type
1194 from six import unichr as six_unichr
1195 from six.moves import xrange as six_xrange
1196
1197
1198 try:
1199     from termcolor import colored
1200 except ImportError:  # pragma: no cover
1201     def colored(what, *args, **kwargs):
1202         return what
1203
1204 __version__ = "7.7"
1205
1206 __all__ = (
1207     "agg_octet_string",
1208     "Any",
1209     "BitString",
1210     "BMPString",
1211     "Boolean",
1212     "BoundsError",
1213     "Choice",
1214     "colonize_hex",
1215     "DecodeError",
1216     "DecodePathDefBy",
1217     "encode2pass",
1218     "encode_cer",
1219     "Enumerated",
1220     "ExceedingData",
1221     "file_mmaped",
1222     "GeneralizedTime",
1223     "GeneralString",
1224     "GraphicString",
1225     "hexdec",
1226     "hexenc",
1227     "IA5String",
1228     "Integer",
1229     "InvalidLength",
1230     "InvalidOID",
1231     "InvalidValueType",
1232     "ISO646String",
1233     "LenIndefForm",
1234     "NotEnoughData",
1235     "Null",
1236     "NumericString",
1237     "obj_by_path",
1238     "ObjectIdentifier",
1239     "ObjNotReady",
1240     "ObjUnknown",
1241     "OctetString",
1242     "PrimitiveTypes",
1243     "PrintableString",
1244     "Sequence",
1245     "SequenceOf",
1246     "Set",
1247     "SetOf",
1248     "T61String",
1249     "tag_ctxc",
1250     "tag_ctxp",
1251     "tag_decode",
1252     "TagClassApplication",
1253     "TagClassContext",
1254     "TagClassPrivate",
1255     "TagClassUniversal",
1256     "TagFormConstructed",
1257     "TagFormPrimitive",
1258     "TagMismatch",
1259     "TeletexString",
1260     "UniversalString",
1261     "UTCTime",
1262     "UTF8String",
1263     "VideotexString",
1264     "VisibleString",
1265 )
1266
1267 TagClassUniversal = 0
1268 TagClassApplication = 1 << 6
1269 TagClassContext = 1 << 7
1270 TagClassPrivate = 1 << 6 | 1 << 7
1271 TagFormPrimitive = 0
1272 TagFormConstructed = 1 << 5
1273 TagClassReprs = {
1274     TagClassContext: "",
1275     TagClassApplication: "APPLICATION ",
1276     TagClassPrivate: "PRIVATE ",
1277     TagClassUniversal: "UNIV ",
1278 }
1279 EOC = b"\x00\x00"
1280 EOC_LEN = len(EOC)
1281 LENINDEF = b"\x80"  # length indefinite mark
1282 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1283 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1284 SET01 = frozenset("01")
1285 DECIMALS = frozenset(digits)
1286 DECIMAL_SIGNS = ".,"
1287 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1288
1289
1290 def file_mmaped(fd):
1291     """Make mmap-ed memoryview for reading from file
1292
1293     :param fd: file object
1294     :returns: memoryview over read-only mmap-ing of the whole file
1295     """
1296     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1297
1298
1299 def pureint(value):
1300     if not set(value) <= DECIMALS:
1301         raise ValueError("non-pure integer")
1302     return int(value)
1303
1304
1305 def fractions2float(fractions_raw):
1306     pureint(fractions_raw)
1307     return float("0." + fractions_raw)
1308
1309
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311     """Get define by decode path
1312     """
1313     for path, define in defines_by_path:
1314         if len(path) != len(sub_decode_path):
1315             continue
1316         for p1, p2 in zip(path, sub_decode_path):
1317             if (p1 is not any) and (p1 != p2):
1318                 break
1319         else:
1320             return define
1321
1322
1323 ########################################################################
1324 # Errors
1325 ########################################################################
1326
1327 class ASN1Error(ValueError):
1328     pass
1329
1330
1331 class DecodeError(ASN1Error):
1332     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1333         """
1334         :param str msg: reason of decode failing
1335         :param klass: optional exact DecodeError inherited class (like
1336                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337                       :py:exc:`InvalidLength`)
1338         :param decode_path: tuple of strings. It contains human
1339                             readable names of the fields through which
1340                             decoding process has passed
1341         :param int offset: binary offset where failure happened
1342         """
1343         super(DecodeError, self).__init__()
1344         self.msg = msg
1345         self.klass = klass
1346         self.decode_path = decode_path
1347         self.offset = offset
1348
1349     def __str__(self):
1350         return " ".join(
1351             c for c in (
1352                 "" if self.klass is None else self.klass.__name__,
1353                 (
1354                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355                     if len(self.decode_path) > 0 else ""
1356                 ),
1357                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1358                 self.msg,
1359             ) if c != ""
1360         )
1361
1362     def __repr__(self):
1363         return "%s(%s)" % (self.__class__.__name__, self)
1364
1365
1366 class NotEnoughData(DecodeError):
1367     pass
1368
1369
1370 class ExceedingData(ASN1Error):
1371     def __init__(self, nbytes):
1372         super(ExceedingData, self).__init__()
1373         self.nbytes = nbytes
1374
1375     def __str__(self):
1376         return "%d trailing bytes" % self.nbytes
1377
1378     def __repr__(self):
1379         return "%s(%s)" % (self.__class__.__name__, self)
1380
1381
1382 class LenIndefForm(DecodeError):
1383     pass
1384
1385
1386 class TagMismatch(DecodeError):
1387     pass
1388
1389
1390 class InvalidLength(DecodeError):
1391     pass
1392
1393
1394 class InvalidOID(DecodeError):
1395     pass
1396
1397
1398 class ObjUnknown(ASN1Error):
1399     def __init__(self, name):
1400         super(ObjUnknown, self).__init__()
1401         self.name = name
1402
1403     def __str__(self):
1404         return "object is unknown: %s" % self.name
1405
1406     def __repr__(self):
1407         return "%s(%s)" % (self.__class__.__name__, self)
1408
1409
1410 class ObjNotReady(ASN1Error):
1411     def __init__(self, name):
1412         super(ObjNotReady, self).__init__()
1413         self.name = name
1414
1415     def __str__(self):
1416         return "object is not ready: %s" % self.name
1417
1418     def __repr__(self):
1419         return "%s(%s)" % (self.__class__.__name__, self)
1420
1421
1422 class InvalidValueType(ASN1Error):
1423     def __init__(self, expected_types):
1424         super(InvalidValueType, self).__init__()
1425         self.expected_types = expected_types
1426
1427     def __str__(self):
1428         return "invalid value type, expected: %s" % ", ".join(
1429             [repr(t) for t in self.expected_types]
1430         )
1431
1432     def __repr__(self):
1433         return "%s(%s)" % (self.__class__.__name__, self)
1434
1435
1436 class BoundsError(ASN1Error):
1437     def __init__(self, bound_min, value, bound_max):
1438         super(BoundsError, self).__init__()
1439         self.bound_min = bound_min
1440         self.value = value
1441         self.bound_max = bound_max
1442
1443     def __str__(self):
1444         return "unsatisfied bounds: %s <= %s <= %s" % (
1445             self.bound_min,
1446             self.value,
1447             self.bound_max,
1448         )
1449
1450     def __repr__(self):
1451         return "%s(%s)" % (self.__class__.__name__, self)
1452
1453
1454 ########################################################################
1455 # Basic coders
1456 ########################################################################
1457
1458 _hexdecoder = getdecoder("hex")
1459 _hexencoder = getencoder("hex")
1460
1461
1462 def hexdec(data):
1463     """Binary data to hexadecimal string convert
1464     """
1465     return _hexdecoder(data)[0]
1466
1467
1468 def hexenc(data):
1469     """Hexadecimal string to binary data convert
1470     """
1471     return _hexencoder(data)[0].decode("ascii")
1472
1473
1474 def int_bytes_len(num, byte_len=8):
1475     if num == 0:
1476         return 1
1477     return int(ceil(float(num.bit_length()) / byte_len))
1478
1479
1480 def zero_ended_encode(num):
1481     octets = bytearray(int_bytes_len(num, 7))
1482     i = len(octets) - 1
1483     octets[i] = num & 0x7F
1484     num >>= 7
1485     i -= 1
1486     while num > 0:
1487         octets[i] = 0x80 | (num & 0x7F)
1488         num >>= 7
1489         i -= 1
1490     return bytes(octets)
1491
1492
1493 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1494     """Encode tag to binary form
1495
1496     :param int num: tag's number
1497     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1498                       :py:data:`pyderasn.TagClassContext`,
1499                       :py:data:`pyderasn.TagClassApplication`,
1500                       :py:data:`pyderasn.TagClassPrivate`)
1501     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1502                      :py:data:`pyderasn.TagFormConstructed`)
1503     """
1504     if num < 31:
1505         # [XX|X|.....]
1506         return int2byte(klass | form | num)
1507     # [XX|X|11111][1.......][1.......] ... [0.......]
1508     return int2byte(klass | form | 31) + zero_ended_encode(num)
1509
1510
1511 def tag_decode(tag):
1512     """Decode tag from binary form
1513
1514     .. warning::
1515
1516        No validation is performed, assuming that it has already passed.
1517
1518     It returns tuple with three integers, as
1519     :py:func:`pyderasn.tag_encode` accepts.
1520     """
1521     first_octet = byte2int(tag)
1522     klass = first_octet & 0xC0
1523     form = first_octet & 0x20
1524     if first_octet & 0x1F < 0x1F:
1525         return (klass, form, first_octet & 0x1F)
1526     num = 0
1527     for octet in iterbytes(tag[1:]):
1528         num <<= 7
1529         num |= octet & 0x7F
1530     return (klass, form, num)
1531
1532
1533 def tag_ctxp(num):
1534     """Create CONTEXT PRIMITIVE tag
1535     """
1536     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1537
1538
1539 def tag_ctxc(num):
1540     """Create CONTEXT CONSTRUCTED tag
1541     """
1542     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1543
1544
1545 def tag_strip(data):
1546     """Take off tag from the data
1547
1548     :returns: (encoded tag, tag length, remaining data)
1549     """
1550     if len(data) == 0:
1551         raise NotEnoughData("no data at all")
1552     if byte2int(data) & 0x1F < 31:
1553         return data[:1], 1, data[1:]
1554     i = 0
1555     while True:
1556         i += 1
1557         if i == len(data):
1558             raise DecodeError("unfinished tag")
1559         if indexbytes(data, i) & 0x80 == 0:
1560             break
1561     if i == 1 and indexbytes(data, 1) < 0x1F:
1562         raise DecodeError("unexpected long form")
1563     if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1564         raise DecodeError("leading zero byte in tag value")
1565     i += 1
1566     return data[:i], i, data[i:]
1567
1568
1569 def len_encode(l):
1570     if l < 0x80:
1571         return int2byte(l)
1572     octets = bytearray(int_bytes_len(l) + 1)
1573     octets[0] = 0x80 | (len(octets) - 1)
1574     for i in six_xrange(len(octets) - 1, 0, -1):
1575         octets[i] = l & 0xFF
1576         l >>= 8
1577     return bytes(octets)
1578
1579
1580 def len_decode(data):
1581     """Decode length
1582
1583     :returns: (decoded length, length's length, remaining data)
1584     :raises LenIndefForm: if indefinite form encoding is met
1585     """
1586     if len(data) == 0:
1587         raise NotEnoughData("no data at all")
1588     first_octet = byte2int(data)
1589     if first_octet & 0x80 == 0:
1590         return first_octet, 1, data[1:]
1591     octets_num = first_octet & 0x7F
1592     if octets_num + 1 > len(data):
1593         raise NotEnoughData("encoded length is longer than data")
1594     if octets_num == 0:
1595         raise LenIndefForm()
1596     if byte2int(data[1:]) == 0:
1597         raise DecodeError("leading zeros")
1598     l = 0
1599     for v in iterbytes(data[1:1 + octets_num]):
1600         l = (l << 8) | v
1601     if l <= 127:
1602         raise DecodeError("long form instead of short one")
1603     return l, 1 + octets_num, data[1 + octets_num:]
1604
1605
1606 LEN0 = len_encode(0)
1607 LEN1 = len_encode(1)
1608 LEN1K = len_encode(1000)
1609
1610
1611 def len_size(l):
1612     """How many bytes length field will take
1613     """
1614     if l < 128:
1615         return 1
1616     if l < 256:  # 1 << 8
1617         return 2
1618     if l < 65536:  # 1 << 16
1619         return 3
1620     if l < 16777216:  # 1 << 24
1621         return 4
1622     if l < 4294967296:  # 1 << 32
1623         return 5
1624     if l < 1099511627776:  # 1 << 40
1625         return 6
1626     if l < 281474976710656:  # 1 << 48
1627         return 7
1628     if l < 72057594037927936:  # 1 << 56
1629         return 8
1630     raise OverflowError("too big length")
1631
1632
1633 def write_full(writer, data):
1634     """Fully write provided data
1635
1636     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1637
1638     BytesIO does not guarantee that the whole data will be written at
1639     once. That function write everything provided, raising an error if
1640     ``writer`` returns None.
1641     """
1642     data = memoryview(data)
1643     written = 0
1644     while written != len(data):
1645         n = writer(data[written:])
1646         if n is None:
1647             raise ValueError("can not write to buf")
1648         written += n
1649
1650
1651 # If it is 64-bit system, then use compact 64-bit array of unsigned
1652 # longs. Use an ordinary list with universal integers otherwise, that
1653 # is slower.
1654 if sys_maxsize > 2 ** 32:
1655     def state_2pass_new():
1656         return array("L")
1657 else:
1658     def state_2pass_new():
1659         return []
1660
1661
1662 ########################################################################
1663 # Base class
1664 ########################################################################
1665
1666 class AutoAddSlots(type):
1667     def __new__(cls, name, bases, _dict):
1668         _dict["__slots__"] = _dict.get("__slots__", ())
1669         return type.__new__(cls, name, bases, _dict)
1670
1671
1672 BasicState = namedtuple("BasicState", (
1673     "version",
1674     "tag",
1675     "tag_order",
1676     "expl",
1677     "default",
1678     "optional",
1679     "offset",
1680     "llen",
1681     "vlen",
1682     "expl_lenindef",
1683     "lenindef",
1684     "ber_encoded",
1685 ), **NAMEDTUPLE_KWARGS)
1686
1687
1688 @add_metaclass(AutoAddSlots)
1689 class Obj(object):
1690     """Common ASN.1 object class
1691
1692     All ASN.1 types are inherited from it. It has metaclass that
1693     automatically adds ``__slots__`` to all inherited classes.
1694     """
1695     __slots__ = (
1696         "tag",
1697         "_tag_order",
1698         "_value",
1699         "_expl",
1700         "default",
1701         "optional",
1702         "offset",
1703         "llen",
1704         "vlen",
1705         "expl_lenindef",
1706         "lenindef",
1707         "ber_encoded",
1708     )
1709
1710     def __init__(
1711             self,
1712             impl=None,
1713             expl=None,
1714             default=None,
1715             optional=False,
1716             _decoded=(0, 0, 0),
1717     ):
1718         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1719         self._expl = getattr(self, "expl", None) if expl is None else expl
1720         if self.tag != self.tag_default and self._expl is not None:
1721             raise ValueError("implicit and explicit tags can not be set simultaneously")
1722         if self.tag is None:
1723             self._tag_order = None
1724         else:
1725             tag_class, _, tag_num = tag_decode(
1726                 self.tag if self._expl is None else self._expl
1727             )
1728             self._tag_order = (tag_class, tag_num)
1729         if default is not None:
1730             optional = True
1731         self.optional = optional
1732         self.offset, self.llen, self.vlen = _decoded
1733         self.default = None
1734         self.expl_lenindef = False
1735         self.lenindef = False
1736         self.ber_encoded = False
1737
1738     @property
1739     def ready(self):  # pragma: no cover
1740         """Is object ready to be encoded?
1741         """
1742         raise NotImplementedError()
1743
1744     def _assert_ready(self):
1745         if not self.ready:
1746             raise ObjNotReady(self.__class__.__name__)
1747
1748     @property
1749     def bered(self):
1750         """Is either object or any elements inside is BER encoded?
1751         """
1752         return self.expl_lenindef or self.lenindef or self.ber_encoded
1753
1754     @property
1755     def decoded(self):
1756         """Is object decoded?
1757         """
1758         return (self.llen + self.vlen) > 0
1759
1760     def __getstate__(self):  # pragma: no cover
1761         """Used for making safe to be mutable pickleable copies
1762         """
1763         raise NotImplementedError()
1764
1765     def __setstate__(self, state):
1766         if state.version != __version__:
1767             raise ValueError("data is pickled by different PyDERASN version")
1768         self.tag = state.tag
1769         self._tag_order = state.tag_order
1770         self._expl = state.expl
1771         self.default = state.default
1772         self.optional = state.optional
1773         self.offset = state.offset
1774         self.llen = state.llen
1775         self.vlen = state.vlen
1776         self.expl_lenindef = state.expl_lenindef
1777         self.lenindef = state.lenindef
1778         self.ber_encoded = state.ber_encoded
1779
1780     @property
1781     def tag_order(self):
1782         """Tag's (class, number) used for DER/CER sorting
1783         """
1784         return self._tag_order
1785
1786     @property
1787     def tag_order_cer(self):
1788         return self.tag_order
1789
1790     @property
1791     def tlen(self):
1792         """.. seealso:: :ref:`decoding`
1793         """
1794         return len(self.tag)
1795
1796     @property
1797     def tlvlen(self):
1798         """.. seealso:: :ref:`decoding`
1799         """
1800         return self.tlen + self.llen + self.vlen
1801
1802     def __str__(self):  # pragma: no cover
1803         return self.__bytes__() if PY2 else self.__unicode__()
1804
1805     def __ne__(self, their):
1806         return not(self == their)
1807
1808     def __gt__(self, their):  # pragma: no cover
1809         return not(self < their)
1810
1811     def __le__(self, their):  # pragma: no cover
1812         return (self == their) or (self < their)
1813
1814     def __ge__(self, their):  # pragma: no cover
1815         return (self == their) or (self > their)
1816
1817     def _encode(self):  # pragma: no cover
1818         raise NotImplementedError()
1819
1820     def _encode_cer(self, writer):
1821         write_full(writer, self._encode())
1822
1823     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1824         yield NotImplemented
1825
1826     def _encode1st(self, state):
1827         raise NotImplementedError()
1828
1829     def _encode2nd(self, writer, state_iter):
1830         raise NotImplementedError()
1831
1832     def encode(self):
1833         """DER encode the structure
1834
1835         :returns: DER representation
1836         """
1837         raw = self._encode()
1838         if self._expl is None:
1839             return raw
1840         return b"".join((self._expl, len_encode(len(raw)), raw))
1841
1842     def encode1st(self, state=None):
1843         """Do the 1st pass of 2-pass encoding
1844
1845         :rtype: (int, array("L"))
1846         :returns: full length of encoded data and precalculated various
1847                   objects lengths
1848         """
1849         if state is None:
1850             state = state_2pass_new()
1851         if self._expl is None:
1852             return self._encode1st(state)
1853         state.append(0)
1854         idx = len(state) - 1
1855         vlen, _ = self._encode1st(state)
1856         state[idx] = vlen
1857         fulllen = len(self._expl) + len_size(vlen) + vlen
1858         return fulllen, state
1859
1860     def encode2nd(self, writer, state_iter):
1861         """Do the 2nd pass of 2-pass encoding
1862
1863         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1864         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1865         """
1866         if self._expl is None:
1867             self._encode2nd(writer, state_iter)
1868         else:
1869             write_full(writer, self._expl + len_encode(next(state_iter)))
1870             self._encode2nd(writer, state_iter)
1871
1872     def encode_cer(self, writer):
1873         """CER encode the structure to specified writer
1874
1875         :param writer: must comply with ``io.RawIOBase.write``
1876                        behaviour. It takes slice to be written and
1877                        returns number of bytes processed. If it returns
1878                        None, then exception will be raised
1879         """
1880         if self._expl is not None:
1881             write_full(writer, self._expl + LENINDEF)
1882         if getattr(self, "der_forced", False):
1883             write_full(writer, self._encode())
1884         else:
1885             self._encode_cer(writer)
1886         if self._expl is not None:
1887             write_full(writer, EOC)
1888
1889     def hexencode(self):
1890         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1891         """
1892         return hexenc(self.encode())
1893
1894     def decode(
1895             self,
1896             data,
1897             offset=0,
1898             leavemm=False,
1899             decode_path=(),
1900             ctx=None,
1901             tag_only=False,
1902             _ctx_immutable=True,
1903     ):
1904         """Decode the data
1905
1906         :param data: either binary or memoryview
1907         :param int offset: initial data's offset
1908         :param bool leavemm: do we need to leave memoryview of remaining
1909                     data as is, or convert it to bytes otherwise
1910         :param decode_path: current decode path (tuples of strings,
1911                             possibly with DecodePathDefBy) with will be
1912                             the root for all underlying objects
1913         :param ctx: optional :ref:`context <ctx>` governing decoding process
1914         :param bool tag_only: decode only the tag, without length and
1915                               contents (used only in Choice and Set
1916                               structures, trying to determine if tag satisfies
1917                               the schema)
1918         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1919                                     before using it?
1920         :returns: (Obj, remaining data)
1921
1922         .. seealso:: :ref:`decoding`
1923         """
1924         result = next(self.decode_evgen(
1925             data,
1926             offset,
1927             leavemm,
1928             decode_path,
1929             ctx,
1930             tag_only,
1931             _ctx_immutable,
1932             _evgen_mode=False,
1933         ))
1934         if result is None:
1935             return None
1936         _, obj, tail = result
1937         return obj, tail
1938
1939     def decode_evgen(
1940             self,
1941             data,
1942             offset=0,
1943             leavemm=False,
1944             decode_path=(),
1945             ctx=None,
1946             tag_only=False,
1947             _ctx_immutable=True,
1948             _evgen_mode=True,
1949     ):
1950         """Decode with evgen mode on
1951
1952         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1953         it returns the generator producing ``(decode_path, obj, tail)``
1954         values.
1955         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1956         """
1957         if ctx is None:
1958             ctx = {}
1959         elif _ctx_immutable:
1960             ctx = copy(ctx)
1961         tlv = memoryview(data)
1962         if (
1963                 _evgen_mode and
1964                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1965         ):
1966             _evgen_mode = False
1967         if self._expl is None:
1968             for result in self._decode(
1969                     tlv,
1970                     offset=offset,
1971                     decode_path=decode_path,
1972                     ctx=ctx,
1973                     tag_only=tag_only,
1974                     evgen_mode=_evgen_mode,
1975             ):
1976                 if tag_only:
1977                     yield None
1978                     return
1979                 _decode_path, obj, tail = result
1980                 if _decode_path is not decode_path:
1981                     yield result
1982         else:
1983             try:
1984                 t, tlen, lv = tag_strip(tlv)
1985             except DecodeError as err:
1986                 raise err.__class__(
1987                     msg=err.msg,
1988                     klass=self.__class__,
1989                     decode_path=decode_path,
1990                     offset=offset,
1991                 )
1992             if t != self._expl:
1993                 raise TagMismatch(
1994                     klass=self.__class__,
1995                     decode_path=decode_path,
1996                     offset=offset,
1997                 )
1998             try:
1999                 l, llen, v = len_decode(lv)
2000             except LenIndefForm as err:
2001                 if not ctx.get("bered", False):
2002                     raise err.__class__(
2003                         msg=err.msg,
2004                         klass=self.__class__,
2005                         decode_path=decode_path,
2006                         offset=offset,
2007                     )
2008                 llen, v = 1, lv[1:]
2009                 offset += tlen + llen
2010                 for result in self._decode(
2011                         v,
2012                         offset=offset,
2013                         decode_path=decode_path,
2014                         ctx=ctx,
2015                         tag_only=tag_only,
2016                         evgen_mode=_evgen_mode,
2017                 ):
2018                     if tag_only:  # pragma: no cover
2019                         yield None
2020                         return
2021                     _decode_path, obj, tail = result
2022                     if _decode_path is not decode_path:
2023                         yield result
2024                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2025                 if eoc_expected.tobytes() != EOC:
2026                     raise DecodeError(
2027                         "no EOC",
2028                         klass=self.__class__,
2029                         decode_path=decode_path,
2030                         offset=offset,
2031                     )
2032                 obj.vlen += EOC_LEN
2033                 obj.expl_lenindef = True
2034             except DecodeError as err:
2035                 raise err.__class__(
2036                     msg=err.msg,
2037                     klass=self.__class__,
2038                     decode_path=decode_path,
2039                     offset=offset,
2040                 )
2041             else:
2042                 if l > len(v):
2043                     raise NotEnoughData(
2044                         "encoded length is longer than data",
2045                         klass=self.__class__,
2046                         decode_path=decode_path,
2047                         offset=offset,
2048                     )
2049                 for result in self._decode(
2050                         v,
2051                         offset=offset + tlen + llen,
2052                         decode_path=decode_path,
2053                         ctx=ctx,
2054                         tag_only=tag_only,
2055                         evgen_mode=_evgen_mode,
2056                 ):
2057                     if tag_only:  # pragma: no cover
2058                         yield None
2059                         return
2060                     _decode_path, obj, tail = result
2061                     if _decode_path is not decode_path:
2062                         yield result
2063                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2064                     raise DecodeError(
2065                         "explicit tag out-of-bound, longer than data",
2066                         klass=self.__class__,
2067                         decode_path=decode_path,
2068                         offset=offset,
2069                     )
2070         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2071
2072     def decod(self, data, offset=0, decode_path=(), ctx=None):
2073         """Decode the data, check that tail is empty
2074
2075         :raises ExceedingData: if tail is not empty
2076
2077         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2078         (decode without tail) that also checks that there is no
2079         trailing data left.
2080         """
2081         obj, tail = self.decode(
2082             data,
2083             offset=offset,
2084             decode_path=decode_path,
2085             ctx=ctx,
2086             leavemm=True,
2087         )
2088         if len(tail) > 0:
2089             raise ExceedingData(len(tail))
2090         return obj
2091
2092     def hexdecode(self, data, *args, **kwargs):
2093         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2094         """
2095         return self.decode(hexdec(data), *args, **kwargs)
2096
2097     def hexdecod(self, data, *args, **kwargs):
2098         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2099         """
2100         return self.decod(hexdec(data), *args, **kwargs)
2101
2102     @property
2103     def expled(self):
2104         """.. seealso:: :ref:`decoding`
2105         """
2106         return self._expl is not None
2107
2108     @property
2109     def expl_tag(self):
2110         """.. seealso:: :ref:`decoding`
2111         """
2112         return self._expl
2113
2114     @property
2115     def expl_tlen(self):
2116         """.. seealso:: :ref:`decoding`
2117         """
2118         return len(self._expl)
2119
2120     @property
2121     def expl_llen(self):
2122         """.. seealso:: :ref:`decoding`
2123         """
2124         if self.expl_lenindef:
2125             return 1
2126         return len(len_encode(self.tlvlen))
2127
2128     @property
2129     def expl_offset(self):
2130         """.. seealso:: :ref:`decoding`
2131         """
2132         return self.offset - self.expl_tlen - self.expl_llen
2133
2134     @property
2135     def expl_vlen(self):
2136         """.. seealso:: :ref:`decoding`
2137         """
2138         return self.tlvlen
2139
2140     @property
2141     def expl_tlvlen(self):
2142         """.. seealso:: :ref:`decoding`
2143         """
2144         return self.expl_tlen + self.expl_llen + self.expl_vlen
2145
2146     @property
2147     def fulloffset(self):
2148         """.. seealso:: :ref:`decoding`
2149         """
2150         return self.expl_offset if self.expled else self.offset
2151
2152     @property
2153     def fulllen(self):
2154         """.. seealso:: :ref:`decoding`
2155         """
2156         return self.expl_tlvlen if self.expled else self.tlvlen
2157
2158     def pps_lenindef(self, decode_path):
2159         if self.lenindef and not (
2160                 getattr(self, "defined", None) is not None and
2161                 self.defined[1].lenindef
2162         ):
2163             yield _pp(
2164                 asn1_type_name="EOC",
2165                 obj_name="",
2166                 decode_path=decode_path,
2167                 offset=(
2168                     self.offset + self.tlvlen -
2169                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2170                 ),
2171                 tlen=1,
2172                 llen=1,
2173                 vlen=0,
2174                 ber_encoded=True,
2175                 bered=True,
2176             )
2177         if self.expl_lenindef:
2178             yield _pp(
2179                 asn1_type_name="EOC",
2180                 obj_name="EXPLICIT",
2181                 decode_path=decode_path,
2182                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2183                 tlen=1,
2184                 llen=1,
2185                 vlen=0,
2186                 ber_encoded=True,
2187                 bered=True,
2188             )
2189
2190
2191 def encode_cer(obj):
2192     """Encode to CER in memory buffer
2193
2194     :returns bytes: memory buffer contents
2195     """
2196     buf = BytesIO()
2197     obj.encode_cer(buf.write)
2198     return buf.getvalue()
2199
2200
2201 def encode2pass(obj):
2202     """Encode (2-pass mode) to DER in memory buffer
2203
2204     :returns bytes: memory buffer contents
2205     """
2206     buf = BytesIO()
2207     _, state = obj.encode1st()
2208     obj.encode2nd(buf.write, iter(state))
2209     return buf.getvalue()
2210
2211
2212 class DecodePathDefBy(object):
2213     """DEFINED BY representation inside decode path
2214     """
2215     __slots__ = ("defined_by",)
2216
2217     def __init__(self, defined_by):
2218         self.defined_by = defined_by
2219
2220     def __ne__(self, their):
2221         return not(self == their)
2222
2223     def __eq__(self, their):
2224         if not isinstance(their, self.__class__):
2225             return False
2226         return self.defined_by == their.defined_by
2227
2228     def __str__(self):
2229         return "DEFINED BY " + str(self.defined_by)
2230
2231     def __repr__(self):
2232         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2233
2234
2235 ########################################################################
2236 # Pretty printing
2237 ########################################################################
2238
2239 PP = namedtuple("PP", (
2240     "obj",
2241     "asn1_type_name",
2242     "obj_name",
2243     "decode_path",
2244     "value",
2245     "blob",
2246     "optional",
2247     "default",
2248     "impl",
2249     "expl",
2250     "offset",
2251     "tlen",
2252     "llen",
2253     "vlen",
2254     "expl_offset",
2255     "expl_tlen",
2256     "expl_llen",
2257     "expl_vlen",
2258     "expl_lenindef",
2259     "lenindef",
2260     "ber_encoded",
2261     "bered",
2262 ), **NAMEDTUPLE_KWARGS)
2263
2264
2265 def _pp(
2266         obj=None,
2267         asn1_type_name="unknown",
2268         obj_name="unknown",
2269         decode_path=(),
2270         value=None,
2271         blob=None,
2272         optional=False,
2273         default=False,
2274         impl=None,
2275         expl=None,
2276         offset=0,
2277         tlen=0,
2278         llen=0,
2279         vlen=0,
2280         expl_offset=None,
2281         expl_tlen=None,
2282         expl_llen=None,
2283         expl_vlen=None,
2284         expl_lenindef=False,
2285         lenindef=False,
2286         ber_encoded=False,
2287         bered=False,
2288 ):
2289     return PP(
2290         obj,
2291         asn1_type_name,
2292         obj_name,
2293         decode_path,
2294         value,
2295         blob,
2296         optional,
2297         default,
2298         impl,
2299         expl,
2300         offset,
2301         tlen,
2302         llen,
2303         vlen,
2304         expl_offset,
2305         expl_tlen,
2306         expl_llen,
2307         expl_vlen,
2308         expl_lenindef,
2309         lenindef,
2310         ber_encoded,
2311         bered,
2312     )
2313
2314
2315 def _colourize(what, colour, with_colours, attrs=("bold",)):
2316     return colored(what, colour, attrs=attrs) if with_colours else what
2317
2318
2319 def colonize_hex(hexed):
2320     """Separate hexadecimal string with colons
2321     """
2322     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2323
2324
2325 def find_oid_name(asn1_type_name, oid_maps, value):
2326     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2327         for oid_map in oid_maps:
2328             oid_name = oid_map.get(value)
2329             if oid_name is not None:
2330                 return oid_name
2331     return None
2332
2333
2334 def pp_console_row(
2335         pp,
2336         oid_maps=(),
2337         with_offsets=False,
2338         with_blob=True,
2339         with_colours=False,
2340         with_decode_path=False,
2341         decode_path_len_decrease=0,
2342 ):
2343     cols = []
2344     if with_offsets:
2345         col = "%5d%s%s" % (
2346             pp.offset,
2347             (
2348                 "  " if pp.expl_offset is None else
2349                 ("-%d" % (pp.offset - pp.expl_offset))
2350             ),
2351             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2352         )
2353         col = _colourize(col, "red", with_colours, ())
2354         col += _colourize("B", "red", with_colours) if pp.bered else " "
2355         cols.append(col)
2356         col = "[%d,%d,%4d]%s" % (
2357             pp.tlen, pp.llen, pp.vlen,
2358             LENINDEF_PP_CHAR if pp.lenindef else " "
2359         )
2360         col = _colourize(col, "green", with_colours, ())
2361         cols.append(col)
2362     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2363     if decode_path_len > 0:
2364         cols.append(" ." * decode_path_len)
2365         ent = pp.decode_path[-1]
2366         if isinstance(ent, DecodePathDefBy):
2367             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2368             value = str(ent.defined_by)
2369             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2370             if oid_name is None:
2371                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2372             else:
2373                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2374         else:
2375             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2376     if pp.expl is not None:
2377         klass, _, num = pp.expl
2378         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2379         cols.append(_colourize(col, "blue", with_colours))
2380     if pp.impl is not None:
2381         klass, _, num = pp.impl
2382         col = "[%s%d]" % (TagClassReprs[klass], num)
2383         cols.append(_colourize(col, "blue", with_colours))
2384     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2385         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2386     if pp.ber_encoded:
2387         cols.append(_colourize("BER", "red", with_colours))
2388     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2389     if pp.value is not None:
2390         value = pp.value
2391         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2392         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2393         if oid_name is not None:
2394             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2395         if pp.asn1_type_name == Integer.asn1_type_name:
2396             cols.append(_colourize(
2397                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2398             ))
2399     if with_blob:
2400         if pp.blob.__class__ == binary_type:
2401             cols.append(hexenc(pp.blob))
2402         elif pp.blob.__class__ == tuple:
2403             cols.append(", ".join(pp.blob))
2404     if pp.optional:
2405         cols.append(_colourize("OPTIONAL", "red", with_colours))
2406     if pp.default:
2407         cols.append(_colourize("DEFAULT", "red", with_colours))
2408     if with_decode_path:
2409         cols.append(_colourize(
2410             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2411             "grey",
2412             with_colours,
2413         ))
2414     return " ".join(cols)
2415
2416
2417 def pp_console_blob(pp, decode_path_len_decrease=0):
2418     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2419     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2420     if decode_path_len > 0:
2421         cols.append(" ." * (decode_path_len + 1))
2422     if pp.blob.__class__ == binary_type:
2423         blob = hexenc(pp.blob).upper()
2424         for i in six_xrange(0, len(blob), 32):
2425             chunk = blob[i:i + 32]
2426             yield " ".join(cols + [colonize_hex(chunk)])
2427     elif pp.blob.__class__ == tuple:
2428         yield " ".join(cols + [", ".join(pp.blob)])
2429
2430
2431 def pprint(
2432         obj,
2433         oid_maps=(),
2434         big_blobs=False,
2435         with_colours=False,
2436         with_decode_path=False,
2437         decode_path_only=(),
2438         decode_path=(),
2439 ):
2440     """Pretty print object
2441
2442     :param Obj obj: object you want to pretty print
2443     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2444                      Its human readable form is printed when OID is met
2445     :param big_blobs: if large binary objects are met (like OctetString
2446                       values), do we need to print them too, on separate
2447                       lines
2448     :param with_colours: colourize output, if ``termcolor`` library
2449                          is available
2450     :param with_decode_path: print decode path
2451     :param decode_path_only: print only that specified decode path
2452     """
2453     def _pprint_pps(pps):
2454         for pp in pps:
2455             if hasattr(pp, "_fields"):
2456                 if (
2457                         decode_path_only != () and
2458                         tuple(
2459                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2460                         ) != decode_path_only
2461                 ):
2462                     continue
2463                 if big_blobs:
2464                     yield pp_console_row(
2465                         pp,
2466                         oid_maps=oid_maps,
2467                         with_offsets=True,
2468                         with_blob=False,
2469                         with_colours=with_colours,
2470                         with_decode_path=with_decode_path,
2471                         decode_path_len_decrease=len(decode_path_only),
2472                     )
2473                     for row in pp_console_blob(
2474                             pp,
2475                             decode_path_len_decrease=len(decode_path_only),
2476                     ):
2477                         yield row
2478                 else:
2479                     yield pp_console_row(
2480                         pp,
2481                         oid_maps=oid_maps,
2482                         with_offsets=True,
2483                         with_blob=True,
2484                         with_colours=with_colours,
2485                         with_decode_path=with_decode_path,
2486                         decode_path_len_decrease=len(decode_path_only),
2487                     )
2488             else:
2489                 for row in _pprint_pps(pp):
2490                     yield row
2491     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2492
2493
2494 ########################################################################
2495 # ASN.1 primitive types
2496 ########################################################################
2497
2498 BooleanState = namedtuple(
2499     "BooleanState",
2500     BasicState._fields + ("value",),
2501     **NAMEDTUPLE_KWARGS
2502 )
2503
2504
2505 class Boolean(Obj):
2506     """``BOOLEAN`` boolean type
2507
2508     >>> b = Boolean(True)
2509     BOOLEAN True
2510     >>> b == Boolean(True)
2511     True
2512     >>> bool(b)
2513     True
2514     """
2515     __slots__ = ()
2516     tag_default = tag_encode(1)
2517     asn1_type_name = "BOOLEAN"
2518
2519     def __init__(
2520             self,
2521             value=None,
2522             impl=None,
2523             expl=None,
2524             default=None,
2525             optional=False,
2526             _decoded=(0, 0, 0),
2527     ):
2528         """
2529         :param value: set the value. Either boolean type, or
2530                       :py:class:`pyderasn.Boolean` object
2531         :param bytes impl: override default tag with ``IMPLICIT`` one
2532         :param bytes expl: override default tag with ``EXPLICIT`` one
2533         :param default: set default value. Type same as in ``value``
2534         :param bool optional: is object ``OPTIONAL`` in sequence
2535         """
2536         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2537         self._value = None if value is None else self._value_sanitize(value)
2538         if default is not None:
2539             default = self._value_sanitize(default)
2540             self.default = self.__class__(
2541                 value=default,
2542                 impl=self.tag,
2543                 expl=self._expl,
2544             )
2545             if value is None:
2546                 self._value = default
2547
2548     def _value_sanitize(self, value):
2549         if value.__class__ == bool:
2550             return value
2551         if issubclass(value.__class__, Boolean):
2552             return value._value
2553         raise InvalidValueType((self.__class__, bool))
2554
2555     @property
2556     def ready(self):
2557         return self._value is not None
2558
2559     def __getstate__(self):
2560         return BooleanState(
2561             __version__,
2562             self.tag,
2563             self._tag_order,
2564             self._expl,
2565             self.default,
2566             self.optional,
2567             self.offset,
2568             self.llen,
2569             self.vlen,
2570             self.expl_lenindef,
2571             self.lenindef,
2572             self.ber_encoded,
2573             self._value,
2574         )
2575
2576     def __setstate__(self, state):
2577         super(Boolean, self).__setstate__(state)
2578         self._value = state.value
2579
2580     def __nonzero__(self):
2581         self._assert_ready()
2582         return self._value
2583
2584     def __bool__(self):
2585         self._assert_ready()
2586         return self._value
2587
2588     def __eq__(self, their):
2589         if their.__class__ == bool:
2590             return self._value == their
2591         if not issubclass(their.__class__, Boolean):
2592             return False
2593         return (
2594             self._value == their._value and
2595             self.tag == their.tag and
2596             self._expl == their._expl
2597         )
2598
2599     def __call__(
2600             self,
2601             value=None,
2602             impl=None,
2603             expl=None,
2604             default=None,
2605             optional=None,
2606     ):
2607         return self.__class__(
2608             value=value,
2609             impl=self.tag if impl is None else impl,
2610             expl=self._expl if expl is None else expl,
2611             default=self.default if default is None else default,
2612             optional=self.optional if optional is None else optional,
2613         )
2614
2615     def _encode(self):
2616         self._assert_ready()
2617         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2618
2619     def _encode1st(self, state):
2620         return len(self.tag) + 2, state
2621
2622     def _encode2nd(self, writer, state_iter):
2623         self._assert_ready()
2624         write_full(writer, self._encode())
2625
2626     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2627         try:
2628             t, _, lv = tag_strip(tlv)
2629         except DecodeError as err:
2630             raise err.__class__(
2631                 msg=err.msg,
2632                 klass=self.__class__,
2633                 decode_path=decode_path,
2634                 offset=offset,
2635             )
2636         if t != self.tag:
2637             raise TagMismatch(
2638                 klass=self.__class__,
2639                 decode_path=decode_path,
2640                 offset=offset,
2641             )
2642         if tag_only:
2643             yield None
2644             return
2645         try:
2646             l, _, v = len_decode(lv)
2647         except DecodeError as err:
2648             raise err.__class__(
2649                 msg=err.msg,
2650                 klass=self.__class__,
2651                 decode_path=decode_path,
2652                 offset=offset,
2653             )
2654         if l != 1:
2655             raise InvalidLength(
2656                 "Boolean's length must be equal to 1",
2657                 klass=self.__class__,
2658                 decode_path=decode_path,
2659                 offset=offset,
2660             )
2661         if l > len(v):
2662             raise NotEnoughData(
2663                 "encoded length is longer than data",
2664                 klass=self.__class__,
2665                 decode_path=decode_path,
2666                 offset=offset,
2667             )
2668         first_octet = byte2int(v)
2669         ber_encoded = False
2670         if first_octet == 0:
2671             value = False
2672         elif first_octet == 0xFF:
2673             value = True
2674         elif ctx.get("bered", False):
2675             value = True
2676             ber_encoded = True
2677         else:
2678             raise DecodeError(
2679                 "unacceptable Boolean value",
2680                 klass=self.__class__,
2681                 decode_path=decode_path,
2682                 offset=offset,
2683             )
2684         obj = self.__class__(
2685             value=value,
2686             impl=self.tag,
2687             expl=self._expl,
2688             default=self.default,
2689             optional=self.optional,
2690             _decoded=(offset, 1, 1),
2691         )
2692         obj.ber_encoded = ber_encoded
2693         yield decode_path, obj, v[1:]
2694
2695     def __repr__(self):
2696         return pp_console_row(next(self.pps()))
2697
2698     def pps(self, decode_path=()):
2699         yield _pp(
2700             obj=self,
2701             asn1_type_name=self.asn1_type_name,
2702             obj_name=self.__class__.__name__,
2703             decode_path=decode_path,
2704             value=str(self._value) if self.ready else None,
2705             optional=self.optional,
2706             default=self == self.default,
2707             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2708             expl=None if self._expl is None else tag_decode(self._expl),
2709             offset=self.offset,
2710             tlen=self.tlen,
2711             llen=self.llen,
2712             vlen=self.vlen,
2713             expl_offset=self.expl_offset if self.expled else None,
2714             expl_tlen=self.expl_tlen if self.expled else None,
2715             expl_llen=self.expl_llen if self.expled else None,
2716             expl_vlen=self.expl_vlen if self.expled else None,
2717             expl_lenindef=self.expl_lenindef,
2718             ber_encoded=self.ber_encoded,
2719             bered=self.bered,
2720         )
2721         for pp in self.pps_lenindef(decode_path):
2722             yield pp
2723
2724
2725 IntegerState = namedtuple(
2726     "IntegerState",
2727     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2728     **NAMEDTUPLE_KWARGS
2729 )
2730
2731
2732 class Integer(Obj):
2733     """``INTEGER`` integer type
2734
2735     >>> b = Integer(-123)
2736     INTEGER -123
2737     >>> b == Integer(-123)
2738     True
2739     >>> int(b)
2740     -123
2741
2742     >>> Integer(2, bounds=(1, 3))
2743     INTEGER 2
2744     >>> Integer(5, bounds=(1, 3))
2745     Traceback (most recent call last):
2746     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2747
2748     ::
2749
2750         class Version(Integer):
2751             schema = (
2752                 ("v1", 0),
2753                 ("v2", 1),
2754                 ("v3", 2),
2755             )
2756
2757     >>> v = Version("v1")
2758     Version INTEGER v1
2759     >>> int(v)
2760     0
2761     >>> v.named
2762     'v1'
2763     >>> v.specs
2764     {'v3': 2, 'v1': 0, 'v2': 1}
2765     """
2766     __slots__ = ("specs", "_bound_min", "_bound_max")
2767     tag_default = tag_encode(2)
2768     asn1_type_name = "INTEGER"
2769
2770     def __init__(
2771             self,
2772             value=None,
2773             bounds=None,
2774             impl=None,
2775             expl=None,
2776             default=None,
2777             optional=False,
2778             _specs=None,
2779             _decoded=(0, 0, 0),
2780     ):
2781         """
2782         :param value: set the value. Either integer type, named value
2783                       (if ``schema`` is specified in the class), or
2784                       :py:class:`pyderasn.Integer` object
2785         :param bounds: set ``(MIN, MAX)`` value constraint.
2786                        (-inf, +inf) by default
2787         :param bytes impl: override default tag with ``IMPLICIT`` one
2788         :param bytes expl: override default tag with ``EXPLICIT`` one
2789         :param default: set default value. Type same as in ``value``
2790         :param bool optional: is object ``OPTIONAL`` in sequence
2791         """
2792         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2793         self._value = value
2794         specs = getattr(self, "schema", {}) if _specs is None else _specs
2795         self.specs = specs if specs.__class__ == dict else dict(specs)
2796         self._bound_min, self._bound_max = getattr(
2797             self,
2798             "bounds",
2799             (float("-inf"), float("+inf")),
2800         ) if bounds is None else bounds
2801         if value is not None:
2802             self._value = self._value_sanitize(value)
2803         if default is not None:
2804             default = self._value_sanitize(default)
2805             self.default = self.__class__(
2806                 value=default,
2807                 impl=self.tag,
2808                 expl=self._expl,
2809                 _specs=self.specs,
2810             )
2811             if self._value is None:
2812                 self._value = default
2813
2814     def _value_sanitize(self, value):
2815         if isinstance(value, integer_types):
2816             pass
2817         elif issubclass(value.__class__, Integer):
2818             value = value._value
2819         elif value.__class__ == str:
2820             value = self.specs.get(value)
2821             if value is None:
2822                 raise ObjUnknown("integer value: %s" % value)
2823         else:
2824             raise InvalidValueType((self.__class__, int, str))
2825         if not self._bound_min <= value <= self._bound_max:
2826             raise BoundsError(self._bound_min, value, self._bound_max)
2827         return value
2828
2829     @property
2830     def ready(self):
2831         return self._value is not None
2832
2833     def __getstate__(self):
2834         return IntegerState(
2835             __version__,
2836             self.tag,
2837             self._tag_order,
2838             self._expl,
2839             self.default,
2840             self.optional,
2841             self.offset,
2842             self.llen,
2843             self.vlen,
2844             self.expl_lenindef,
2845             self.lenindef,
2846             self.ber_encoded,
2847             self.specs,
2848             self._value,
2849             self._bound_min,
2850             self._bound_max,
2851         )
2852
2853     def __setstate__(self, state):
2854         super(Integer, self).__setstate__(state)
2855         self.specs = state.specs
2856         self._value = state.value
2857         self._bound_min = state.bound_min
2858         self._bound_max = state.bound_max
2859
2860     def __int__(self):
2861         self._assert_ready()
2862         return int(self._value)
2863
2864     def tohex(self):
2865         """Hexadecimal representation
2866
2867         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2868         """
2869         hex_repr = hex(int(self))[2:].upper()
2870         if len(hex_repr) % 2 != 0:
2871             hex_repr = "0" + hex_repr
2872         return hex_repr
2873
2874     def __hash__(self):
2875         self._assert_ready()
2876         return hash(b"".join((
2877             self.tag,
2878             bytes(self._expl or b""),
2879             str(self._value).encode("ascii"),
2880         )))
2881
2882     def __eq__(self, their):
2883         if isinstance(their, integer_types):
2884             return self._value == their
2885         if not issubclass(their.__class__, Integer):
2886             return False
2887         return (
2888             self._value == their._value and
2889             self.tag == their.tag and
2890             self._expl == their._expl
2891         )
2892
2893     def __lt__(self, their):
2894         return self._value < their._value
2895
2896     @property
2897     def named(self):
2898         """Return named representation (if exists) of the value
2899         """
2900         for name, value in iteritems(self.specs):
2901             if value == self._value:
2902                 return name
2903         return None
2904
2905     def __call__(
2906             self,
2907             value=None,
2908             bounds=None,
2909             impl=None,
2910             expl=None,
2911             default=None,
2912             optional=None,
2913     ):
2914         return self.__class__(
2915             value=value,
2916             bounds=(
2917                 (self._bound_min, self._bound_max)
2918                 if bounds is None else bounds
2919             ),
2920             impl=self.tag if impl is None else impl,
2921             expl=self._expl if expl is None else expl,
2922             default=self.default if default is None else default,
2923             optional=self.optional if optional is None else optional,
2924             _specs=self.specs,
2925         )
2926
2927     def _encode_payload(self):
2928         self._assert_ready()
2929         value = self._value
2930         if PY2:
2931             if value == 0:
2932                 octets = bytearray([0])
2933             elif value < 0:
2934                 value = -value
2935                 value -= 1
2936                 octets = bytearray()
2937                 while value > 0:
2938                     octets.append((value & 0xFF) ^ 0xFF)
2939                     value >>= 8
2940                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2941                     octets.append(0xFF)
2942             else:
2943                 octets = bytearray()
2944                 while value > 0:
2945                     octets.append(value & 0xFF)
2946                     value >>= 8
2947                 if octets[-1] & 0x80 > 0:
2948                     octets.append(0x00)
2949             octets.reverse()
2950             octets = bytes(octets)
2951         else:
2952             bytes_len = ceil(value.bit_length() / 8) or 1
2953             while True:
2954                 try:
2955                     octets = value.to_bytes(
2956                         bytes_len,
2957                         byteorder="big",
2958                         signed=True,
2959                     )
2960                 except OverflowError:
2961                     bytes_len += 1
2962                 else:
2963                     break
2964         return octets
2965
2966     def _encode(self):
2967         octets = self._encode_payload()
2968         return b"".join((self.tag, len_encode(len(octets)), octets))
2969
2970     def _encode1st(self, state):
2971         l = len(self._encode_payload())
2972         return len(self.tag) + len_size(l) + l, state
2973
2974     def _encode2nd(self, writer, state_iter):
2975         write_full(writer, self._encode())
2976
2977     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2978         try:
2979             t, _, lv = tag_strip(tlv)
2980         except DecodeError as err:
2981             raise err.__class__(
2982                 msg=err.msg,
2983                 klass=self.__class__,
2984                 decode_path=decode_path,
2985                 offset=offset,
2986             )
2987         if t != self.tag:
2988             raise TagMismatch(
2989                 klass=self.__class__,
2990                 decode_path=decode_path,
2991                 offset=offset,
2992             )
2993         if tag_only:
2994             yield None
2995             return
2996         try:
2997             l, llen, v = len_decode(lv)
2998         except DecodeError as err:
2999             raise err.__class__(
3000                 msg=err.msg,
3001                 klass=self.__class__,
3002                 decode_path=decode_path,
3003                 offset=offset,
3004             )
3005         if l > len(v):
3006             raise NotEnoughData(
3007                 "encoded length is longer than data",
3008                 klass=self.__class__,
3009                 decode_path=decode_path,
3010                 offset=offset,
3011             )
3012         if l == 0:
3013             raise NotEnoughData(
3014                 "zero length",
3015                 klass=self.__class__,
3016                 decode_path=decode_path,
3017                 offset=offset,
3018             )
3019         v, tail = v[:l], v[l:]
3020         first_octet = byte2int(v)
3021         if l > 1:
3022             second_octet = byte2int(v[1:])
3023             if (
3024                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3025                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3026             ):
3027                 raise DecodeError(
3028                     "non normalized integer",
3029                     klass=self.__class__,
3030                     decode_path=decode_path,
3031                     offset=offset,
3032                 )
3033         if PY2:
3034             value = 0
3035             if first_octet & 0x80 > 0:
3036                 octets = bytearray()
3037                 for octet in bytearray(v):
3038                     octets.append(octet ^ 0xFF)
3039                 for octet in octets:
3040                     value = (value << 8) | octet
3041                 value += 1
3042                 value = -value
3043             else:
3044                 for octet in bytearray(v):
3045                     value = (value << 8) | octet
3046         else:
3047             value = int.from_bytes(v, byteorder="big", signed=True)
3048         try:
3049             obj = self.__class__(
3050                 value=value,
3051                 bounds=(self._bound_min, self._bound_max),
3052                 impl=self.tag,
3053                 expl=self._expl,
3054                 default=self.default,
3055                 optional=self.optional,
3056                 _specs=self.specs,
3057                 _decoded=(offset, llen, l),
3058             )
3059         except BoundsError as err:
3060             raise DecodeError(
3061                 msg=str(err),
3062                 klass=self.__class__,
3063                 decode_path=decode_path,
3064                 offset=offset,
3065             )
3066         yield decode_path, obj, tail
3067
3068     def __repr__(self):
3069         return pp_console_row(next(self.pps()))
3070
3071     def pps(self, decode_path=()):
3072         yield _pp(
3073             obj=self,
3074             asn1_type_name=self.asn1_type_name,
3075             obj_name=self.__class__.__name__,
3076             decode_path=decode_path,
3077             value=(self.named or str(self._value)) if self.ready else None,
3078             optional=self.optional,
3079             default=self == self.default,
3080             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3081             expl=None if self._expl is None else tag_decode(self._expl),
3082             offset=self.offset,
3083             tlen=self.tlen,
3084             llen=self.llen,
3085             vlen=self.vlen,
3086             expl_offset=self.expl_offset if self.expled else None,
3087             expl_tlen=self.expl_tlen if self.expled else None,
3088             expl_llen=self.expl_llen if self.expled else None,
3089             expl_vlen=self.expl_vlen if self.expled else None,
3090             expl_lenindef=self.expl_lenindef,
3091             bered=self.bered,
3092         )
3093         for pp in self.pps_lenindef(decode_path):
3094             yield pp
3095
3096
3097 BitStringState = namedtuple(
3098     "BitStringState",
3099     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3100     **NAMEDTUPLE_KWARGS
3101 )
3102
3103
3104 class BitString(Obj):
3105     """``BIT STRING`` bit string type
3106
3107     >>> BitString(b"hello world")
3108     BIT STRING 88 bits 68656c6c6f20776f726c64
3109     >>> bytes(b)
3110     b'hello world'
3111     >>> b == b"hello world"
3112     True
3113     >>> b.bit_len
3114     88
3115
3116     >>> BitString("'0A3B5F291CD'H")
3117     BIT STRING 44 bits 0a3b5f291cd0
3118     >>> b = BitString("'010110000000'B")
3119     BIT STRING 12 bits 5800
3120     >>> b.bit_len
3121     12
3122     >>> b[0], b[1], b[2], b[3]
3123     (False, True, False, True)
3124     >>> b[1000]
3125     False
3126     >>> [v for v in b]
3127     [False, True, False, True, True, False, False, False, False, False, False, False]
3128
3129     ::
3130
3131         class KeyUsage(BitString):
3132             schema = (
3133                 ("digitalSignature", 0),
3134                 ("nonRepudiation", 1),
3135                 ("keyEncipherment", 2),
3136             )
3137
3138     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3139     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3140     >>> b.named
3141     ['nonRepudiation', 'keyEncipherment']
3142     >>> b.specs
3143     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3144
3145     .. note::
3146
3147        Pay attention that BIT STRING can be encoded both in primitive
3148        and constructed forms. Decoder always checks constructed form tag
3149        additionally to specified primitive one. If BER decoding is
3150        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3151        of DER restrictions.
3152     """
3153     __slots__ = ("tag_constructed", "specs", "defined")
3154     tag_default = tag_encode(3)
3155     asn1_type_name = "BIT STRING"
3156
3157     def __init__(
3158             self,
3159             value=None,
3160             impl=None,
3161             expl=None,
3162             default=None,
3163             optional=False,
3164             _specs=None,
3165             _decoded=(0, 0, 0),
3166     ):
3167         """
3168         :param value: set the value. Either binary type, tuple of named
3169                       values (if ``schema`` is specified in the class),
3170                       string in ``'XXX...'B`` form, or
3171                       :py:class:`pyderasn.BitString` object
3172         :param bytes impl: override default tag with ``IMPLICIT`` one
3173         :param bytes expl: override default tag with ``EXPLICIT`` one
3174         :param default: set default value. Type same as in ``value``
3175         :param bool optional: is object ``OPTIONAL`` in sequence
3176         """
3177         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3178         specs = getattr(self, "schema", {}) if _specs is None else _specs
3179         self.specs = specs if specs.__class__ == dict else dict(specs)
3180         self._value = None if value is None else self._value_sanitize(value)
3181         if default is not None:
3182             default = self._value_sanitize(default)
3183             self.default = self.__class__(
3184                 value=default,
3185                 impl=self.tag,
3186                 expl=self._expl,
3187             )
3188             if value is None:
3189                 self._value = default
3190         self.defined = None
3191         tag_klass, _, tag_num = tag_decode(self.tag)
3192         self.tag_constructed = tag_encode(
3193             klass=tag_klass,
3194             form=TagFormConstructed,
3195             num=tag_num,
3196         )
3197
3198     def _bits2octets(self, bits):
3199         if len(self.specs) > 0:
3200             bits = bits.rstrip("0")
3201         bit_len = len(bits)
3202         bits += "0" * ((8 - (bit_len % 8)) % 8)
3203         octets = bytearray(len(bits) // 8)
3204         for i in six_xrange(len(octets)):
3205             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3206         return bit_len, bytes(octets)
3207
3208     def _value_sanitize(self, value):
3209         if isinstance(value, (string_types, binary_type)):
3210             if (
3211                     isinstance(value, string_types) and
3212                     value.startswith("'")
3213             ):
3214                 if value.endswith("'B"):
3215                     value = value[1:-2]
3216                     if not frozenset(value) <= SET01:
3217                         raise ValueError("B's coding contains unacceptable chars")
3218                     return self._bits2octets(value)
3219                 if value.endswith("'H"):
3220                     value = value[1:-2]
3221                     return (
3222                         len(value) * 4,
3223                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3224                     )
3225             if value.__class__ == binary_type:
3226                 return (len(value) * 8, value)
3227             raise InvalidValueType((self.__class__, string_types, binary_type))
3228         if value.__class__ == tuple:
3229             if (
3230                     len(value) == 2 and
3231                     isinstance(value[0], integer_types) and
3232                     value[1].__class__ == binary_type
3233             ):
3234                 return value
3235             bits = []
3236             for name in value:
3237                 bit = self.specs.get(name)
3238                 if bit is None:
3239                     raise ObjUnknown("BitString value: %s" % name)
3240                 bits.append(bit)
3241             if len(bits) == 0:
3242                 return self._bits2octets("")
3243             bits = frozenset(bits)
3244             return self._bits2octets("".join(
3245                 ("1" if bit in bits else "0")
3246                 for bit in six_xrange(max(bits) + 1)
3247             ))
3248         if issubclass(value.__class__, BitString):
3249             return value._value
3250         raise InvalidValueType((self.__class__, binary_type, string_types))
3251
3252     @property
3253     def ready(self):
3254         return self._value is not None
3255
3256     def __getstate__(self):
3257         return BitStringState(
3258             __version__,
3259             self.tag,
3260             self._tag_order,
3261             self._expl,
3262             self.default,
3263             self.optional,
3264             self.offset,
3265             self.llen,
3266             self.vlen,
3267             self.expl_lenindef,
3268             self.lenindef,
3269             self.ber_encoded,
3270             self.specs,
3271             self._value,
3272             self.tag_constructed,
3273             self.defined,
3274         )
3275
3276     def __setstate__(self, state):
3277         super(BitString, self).__setstate__(state)
3278         self.specs = state.specs
3279         self._value = state.value
3280         self.tag_constructed = state.tag_constructed
3281         self.defined = state.defined
3282
3283     def __iter__(self):
3284         self._assert_ready()
3285         for i in six_xrange(self._value[0]):
3286             yield self[i]
3287
3288     @property
3289     def bit_len(self):
3290         """Returns number of bits in the string
3291         """
3292         self._assert_ready()
3293         return self._value[0]
3294
3295     def __bytes__(self):
3296         self._assert_ready()
3297         return self._value[1]
3298
3299     def __eq__(self, their):
3300         if their.__class__ == bytes:
3301             return self._value[1] == their
3302         if not issubclass(their.__class__, BitString):
3303             return False
3304         return (
3305             self._value == their._value and
3306             self.tag == their.tag and
3307             self._expl == their._expl
3308         )
3309
3310     @property
3311     def named(self):
3312         """Named representation (if exists) of the bits
3313
3314         :returns: [str(name), ...]
3315         """
3316         return [name for name, bit in iteritems(self.specs) if self[bit]]
3317
3318     def __call__(
3319             self,
3320             value=None,
3321             impl=None,
3322             expl=None,
3323             default=None,
3324             optional=None,
3325     ):
3326         return self.__class__(
3327             value=value,
3328             impl=self.tag if impl is None else impl,
3329             expl=self._expl if expl is None else expl,
3330             default=self.default if default is None else default,
3331             optional=self.optional if optional is None else optional,
3332             _specs=self.specs,
3333         )
3334
3335     def __getitem__(self, key):
3336         if key.__class__ == int:
3337             bit_len, octets = self._value
3338             if key >= bit_len:
3339                 return False
3340             return (
3341                 byte2int(memoryview(octets)[key // 8:]) >>
3342                 (7 - (key % 8))
3343             ) & 1 == 1
3344         if isinstance(key, string_types):
3345             value = self.specs.get(key)
3346             if value is None:
3347                 raise ObjUnknown("BitString value: %s" % key)
3348             return self[value]
3349         raise InvalidValueType((int, str))
3350
3351     def _encode(self):
3352         self._assert_ready()
3353         bit_len, octets = self._value
3354         return b"".join((
3355             self.tag,
3356             len_encode(len(octets) + 1),
3357             int2byte((8 - bit_len % 8) % 8),
3358             octets,
3359         ))
3360
3361     def _encode1st(self, state):
3362         self._assert_ready()
3363         _, octets = self._value
3364         l = len(octets) + 1
3365         return len(self.tag) + len_size(l) + l, state
3366
3367     def _encode2nd(self, writer, state_iter):
3368         bit_len, octets = self._value
3369         write_full(writer, b"".join((
3370             self.tag,
3371             len_encode(len(octets) + 1),
3372             int2byte((8 - bit_len % 8) % 8),
3373         )))
3374         write_full(writer, octets)
3375
3376     def _encode_cer(self, writer):
3377         bit_len, octets = self._value
3378         if len(octets) + 1 <= 1000:
3379             write_full(writer, self._encode())
3380             return
3381         write_full(writer, self.tag_constructed)
3382         write_full(writer, LENINDEF)
3383         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3384             write_full(writer, b"".join((
3385                 BitString.tag_default,
3386                 LEN1K,
3387                 int2byte(0),
3388                 octets[offset:offset + 999],
3389             )))
3390         tail = octets[offset + 999:]
3391         if len(tail) > 0:
3392             tail = int2byte((8 - bit_len % 8) % 8) + tail
3393             write_full(writer, b"".join((
3394                 BitString.tag_default,
3395                 len_encode(len(tail)),
3396                 tail,
3397             )))
3398         write_full(writer, EOC)
3399
3400     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3401         try:
3402             t, tlen, lv = tag_strip(tlv)
3403         except DecodeError as err:
3404             raise err.__class__(
3405                 msg=err.msg,
3406                 klass=self.__class__,
3407                 decode_path=decode_path,
3408                 offset=offset,
3409             )
3410         if t == self.tag:
3411             if tag_only:  # pragma: no cover
3412                 yield None
3413                 return
3414             try:
3415                 l, llen, v = len_decode(lv)
3416             except DecodeError as err:
3417                 raise err.__class__(
3418                     msg=err.msg,
3419                     klass=self.__class__,
3420                     decode_path=decode_path,
3421                     offset=offset,
3422                 )
3423             if l > len(v):
3424                 raise NotEnoughData(
3425                     "encoded length is longer than data",
3426                     klass=self.__class__,
3427                     decode_path=decode_path,
3428                     offset=offset,
3429                 )
3430             if l == 0:
3431                 raise NotEnoughData(
3432                     "zero length",
3433                     klass=self.__class__,
3434                     decode_path=decode_path,
3435                     offset=offset,
3436                 )
3437             pad_size = byte2int(v)
3438             if l == 1 and pad_size != 0:
3439                 raise DecodeError(
3440                     "invalid empty value",
3441                     klass=self.__class__,
3442                     decode_path=decode_path,
3443                     offset=offset,
3444                 )
3445             if pad_size > 7:
3446                 raise DecodeError(
3447                     "too big pad",
3448                     klass=self.__class__,
3449                     decode_path=decode_path,
3450                     offset=offset,
3451                 )
3452             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3453                 raise DecodeError(
3454                     "invalid pad",
3455                     klass=self.__class__,
3456                     decode_path=decode_path,
3457                     offset=offset,
3458                 )
3459             v, tail = v[:l], v[l:]
3460             bit_len = (len(v) - 1) * 8 - pad_size
3461             obj = self.__class__(
3462                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3463                 impl=self.tag,
3464                 expl=self._expl,
3465                 default=self.default,
3466                 optional=self.optional,
3467                 _specs=self.specs,
3468                 _decoded=(offset, llen, l),
3469             )
3470             if evgen_mode:
3471                 obj._value = (bit_len, None)
3472             yield decode_path, obj, tail
3473             return
3474         if t != self.tag_constructed:
3475             raise TagMismatch(
3476                 klass=self.__class__,
3477                 decode_path=decode_path,
3478                 offset=offset,
3479             )
3480         if not ctx.get("bered", False):
3481             raise DecodeError(
3482                 "unallowed BER constructed encoding",
3483                 klass=self.__class__,
3484                 decode_path=decode_path,
3485                 offset=offset,
3486             )
3487         if tag_only:  # pragma: no cover
3488             yield None
3489             return
3490         lenindef = False
3491         try:
3492             l, llen, v = len_decode(lv)
3493         except LenIndefForm:
3494             llen, l, v = 1, 0, lv[1:]
3495             lenindef = True
3496         except DecodeError as err:
3497             raise err.__class__(
3498                 msg=err.msg,
3499                 klass=self.__class__,
3500                 decode_path=decode_path,
3501                 offset=offset,
3502             )
3503         if l > len(v):
3504             raise NotEnoughData(
3505                 "encoded length is longer than data",
3506                 klass=self.__class__,
3507                 decode_path=decode_path,
3508                 offset=offset,
3509             )
3510         if not lenindef and l == 0:
3511             raise NotEnoughData(
3512                 "zero length",
3513                 klass=self.__class__,
3514                 decode_path=decode_path,
3515                 offset=offset,
3516             )
3517         chunks = []
3518         sub_offset = offset + tlen + llen
3519         vlen = 0
3520         while True:
3521             if lenindef:
3522                 if v[:EOC_LEN].tobytes() == EOC:
3523                     break
3524             else:
3525                 if vlen == l:
3526                     break
3527                 if vlen > l:
3528                     raise DecodeError(
3529                         "chunk out of bounds",
3530                         klass=self.__class__,
3531                         decode_path=decode_path + (str(len(chunks) - 1),),
3532                         offset=chunks[-1].offset,
3533                     )
3534             sub_decode_path = decode_path + (str(len(chunks)),)
3535             try:
3536                 if evgen_mode:
3537                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3538                             v,
3539                             offset=sub_offset,
3540                             decode_path=sub_decode_path,
3541                             leavemm=True,
3542                             ctx=ctx,
3543                             _ctx_immutable=False,
3544                     ):
3545                         yield _decode_path, chunk, v_tail
3546                 else:
3547                     _, chunk, v_tail = next(BitString().decode_evgen(
3548                         v,
3549                         offset=sub_offset,
3550                         decode_path=sub_decode_path,
3551                         leavemm=True,
3552                         ctx=ctx,
3553                         _ctx_immutable=False,
3554                         _evgen_mode=False,
3555                     ))
3556             except TagMismatch:
3557                 raise DecodeError(
3558                     "expected BitString encoded chunk",
3559                     klass=self.__class__,
3560                     decode_path=sub_decode_path,
3561                     offset=sub_offset,
3562                 )
3563             chunks.append(chunk)
3564             sub_offset += chunk.tlvlen
3565             vlen += chunk.tlvlen
3566             v = v_tail
3567         if len(chunks) == 0:
3568             raise DecodeError(
3569                 "no chunks",
3570                 klass=self.__class__,
3571                 decode_path=decode_path,
3572                 offset=offset,
3573             )
3574         values = []
3575         bit_len = 0
3576         for chunk_i, chunk in enumerate(chunks[:-1]):
3577             if chunk.bit_len % 8 != 0:
3578                 raise DecodeError(
3579                     "BitString chunk is not multiple of 8 bits",
3580                     klass=self.__class__,
3581                     decode_path=decode_path + (str(chunk_i),),
3582                     offset=chunk.offset,
3583                 )
3584             if not evgen_mode:
3585                 values.append(bytes(chunk))
3586             bit_len += chunk.bit_len
3587         chunk_last = chunks[-1]
3588         if not evgen_mode:
3589             values.append(bytes(chunk_last))
3590         bit_len += chunk_last.bit_len
3591         obj = self.__class__(
3592             value=None if evgen_mode else (bit_len, b"".join(values)),
3593             impl=self.tag,
3594             expl=self._expl,
3595             default=self.default,
3596             optional=self.optional,
3597             _specs=self.specs,
3598             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3599         )
3600         if evgen_mode:
3601             obj._value = (bit_len, None)
3602         obj.lenindef = lenindef
3603         obj.ber_encoded = True
3604         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3605
3606     def __repr__(self):
3607         return pp_console_row(next(self.pps()))
3608
3609     def pps(self, decode_path=()):
3610         value = None
3611         blob = None
3612         if self.ready:
3613             bit_len, blob = self._value
3614             value = "%d bits" % bit_len
3615             if len(self.specs) > 0 and blob is not None:
3616                 blob = tuple(self.named)
3617         yield _pp(
3618             obj=self,
3619             asn1_type_name=self.asn1_type_name,
3620             obj_name=self.__class__.__name__,
3621             decode_path=decode_path,
3622             value=value,
3623             blob=blob,
3624             optional=self.optional,
3625             default=self == self.default,
3626             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3627             expl=None if self._expl is None else tag_decode(self._expl),
3628             offset=self.offset,
3629             tlen=self.tlen,
3630             llen=self.llen,
3631             vlen=self.vlen,
3632             expl_offset=self.expl_offset if self.expled else None,
3633             expl_tlen=self.expl_tlen if self.expled else None,
3634             expl_llen=self.expl_llen if self.expled else None,
3635             expl_vlen=self.expl_vlen if self.expled else None,
3636             expl_lenindef=self.expl_lenindef,
3637             lenindef=self.lenindef,
3638             ber_encoded=self.ber_encoded,
3639             bered=self.bered,
3640         )
3641         defined_by, defined = self.defined or (None, None)
3642         if defined_by is not None:
3643             yield defined.pps(
3644                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3645             )
3646         for pp in self.pps_lenindef(decode_path):
3647             yield pp
3648
3649
3650 OctetStringState = namedtuple(
3651     "OctetStringState",
3652     BasicState._fields + (
3653         "value",
3654         "bound_min",
3655         "bound_max",
3656         "tag_constructed",
3657         "defined",
3658     ),
3659     **NAMEDTUPLE_KWARGS
3660 )
3661
3662
3663 class OctetString(Obj):
3664     """``OCTET STRING`` binary string type
3665
3666     >>> s = OctetString(b"hello world")
3667     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3668     >>> s == OctetString(b"hello world")
3669     True
3670     >>> bytes(s)
3671     b'hello world'
3672
3673     >>> OctetString(b"hello", bounds=(4, 4))
3674     Traceback (most recent call last):
3675     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3676     >>> OctetString(b"hell", bounds=(4, 4))
3677     OCTET STRING 4 bytes 68656c6c
3678
3679     Memoryviews can be used as a values. If memoryview is made on
3680     mmap-ed file, then it does not take storage inside OctetString
3681     itself. In CER encoding mode it will be streamed to the specified
3682     writer, copying 1 KB chunks.
3683     """
3684     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3685     tag_default = tag_encode(4)
3686     asn1_type_name = "OCTET STRING"
3687     evgen_mode_skip_value = True
3688
3689     def __init__(
3690             self,
3691             value=None,
3692             bounds=None,
3693             impl=None,
3694             expl=None,
3695             default=None,
3696             optional=False,
3697             _decoded=(0, 0, 0),
3698             ctx=None,
3699     ):
3700         """
3701         :param value: set the value. Either binary type, or
3702                       :py:class:`pyderasn.OctetString` object
3703         :param bounds: set ``(MIN, MAX)`` value size constraint.
3704                        (-inf, +inf) by default
3705         :param bytes impl: override default tag with ``IMPLICIT`` one
3706         :param bytes expl: override default tag with ``EXPLICIT`` one
3707         :param default: set default value. Type same as in ``value``
3708         :param bool optional: is object ``OPTIONAL`` in sequence
3709         """
3710         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3711         self._value = value
3712         self._bound_min, self._bound_max = getattr(
3713             self,
3714             "bounds",
3715             (0, float("+inf")),
3716         ) if bounds is None else bounds
3717         if value is not None:
3718             self._value = self._value_sanitize(value)
3719         if default is not None:
3720             default = self._value_sanitize(default)
3721             self.default = self.__class__(
3722                 value=default,
3723                 impl=self.tag,
3724                 expl=self._expl,
3725             )
3726             if self._value is None:
3727                 self._value = default
3728         self.defined = None
3729         tag_klass, _, tag_num = tag_decode(self.tag)
3730         self.tag_constructed = tag_encode(
3731             klass=tag_klass,
3732             form=TagFormConstructed,
3733             num=tag_num,
3734         )
3735
3736     def _value_sanitize(self, value):
3737         if value.__class__ == binary_type or value.__class__ == memoryview:
3738             pass
3739         elif issubclass(value.__class__, OctetString):
3740             value = value._value
3741         else:
3742             raise InvalidValueType((self.__class__, bytes, memoryview))
3743         if not self._bound_min <= len(value) <= self._bound_max:
3744             raise BoundsError(self._bound_min, len(value), self._bound_max)
3745         return value
3746
3747     @property
3748     def ready(self):
3749         return self._value is not None
3750
3751     def __getstate__(self):
3752         return OctetStringState(
3753             __version__,
3754             self.tag,
3755             self._tag_order,
3756             self._expl,
3757             self.default,
3758             self.optional,
3759             self.offset,
3760             self.llen,
3761             self.vlen,
3762             self.expl_lenindef,
3763             self.lenindef,
3764             self.ber_encoded,
3765             self._value,
3766             self._bound_min,
3767             self._bound_max,
3768             self.tag_constructed,
3769             self.defined,
3770         )
3771
3772     def __setstate__(self, state):
3773         super(OctetString, self).__setstate__(state)
3774         self._value = state.value
3775         self._bound_min = state.bound_min
3776         self._bound_max = state.bound_max
3777         self.tag_constructed = state.tag_constructed
3778         self.defined = state.defined
3779
3780     def __bytes__(self):
3781         self._assert_ready()
3782         return bytes(self._value)
3783
3784     def __eq__(self, their):
3785         if their.__class__ == binary_type:
3786             return self._value == their
3787         if not issubclass(their.__class__, OctetString):
3788             return False
3789         return (
3790             self._value == their._value and
3791             self.tag == their.tag and
3792             self._expl == their._expl
3793         )
3794
3795     def __lt__(self, their):
3796         return self._value < their._value
3797
3798     def __call__(
3799             self,
3800             value=None,
3801             bounds=None,
3802             impl=None,
3803             expl=None,
3804             default=None,
3805             optional=None,
3806     ):
3807         return self.__class__(
3808             value=value,
3809             bounds=(
3810                 (self._bound_min, self._bound_max)
3811                 if bounds is None else bounds
3812             ),
3813             impl=self.tag if impl is None else impl,
3814             expl=self._expl if expl is None else expl,
3815             default=self.default if default is None else default,
3816             optional=self.optional if optional is None else optional,
3817         )
3818
3819     def _encode(self):
3820         self._assert_ready()
3821         return b"".join((
3822             self.tag,
3823             len_encode(len(self._value)),
3824             self._value,
3825         ))
3826
3827     def _encode1st(self, state):
3828         self._assert_ready()
3829         l = len(self._value)
3830         return len(self.tag) + len_size(l) + l, state
3831
3832     def _encode2nd(self, writer, state_iter):
3833         value = self._value
3834         write_full(writer, self.tag + len_encode(len(value)))
3835         write_full(writer, value)
3836
3837     def _encode_cer(self, writer):
3838         octets = self._value
3839         if len(octets) <= 1000:
3840             write_full(writer, self._encode())
3841             return
3842         write_full(writer, self.tag_constructed)
3843         write_full(writer, LENINDEF)
3844         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3845             write_full(writer, b"".join((
3846                 OctetString.tag_default,
3847                 LEN1K,
3848                 octets[offset:offset + 1000],
3849             )))
3850         tail = octets[offset + 1000:]
3851         if len(tail) > 0:
3852             write_full(writer, b"".join((
3853                 OctetString.tag_default,
3854                 len_encode(len(tail)),
3855                 tail,
3856             )))
3857         write_full(writer, EOC)
3858
3859     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3860         try:
3861             t, tlen, lv = tag_strip(tlv)
3862         except DecodeError as err:
3863             raise err.__class__(
3864                 msg=err.msg,
3865                 klass=self.__class__,
3866                 decode_path=decode_path,
3867                 offset=offset,
3868             )
3869         if t == self.tag:
3870             if tag_only:
3871                 yield None
3872                 return
3873             try:
3874                 l, llen, v = len_decode(lv)
3875             except DecodeError as err:
3876                 raise err.__class__(
3877                     msg=err.msg,
3878                     klass=self.__class__,
3879                     decode_path=decode_path,
3880                     offset=offset,
3881                 )
3882             if l > len(v):
3883                 raise NotEnoughData(
3884                     "encoded length is longer than data",
3885                     klass=self.__class__,
3886                     decode_path=decode_path,
3887                     offset=offset,
3888                 )
3889             v, tail = v[:l], v[l:]
3890             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3891                 raise DecodeError(
3892                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3893                     klass=self.__class__,
3894                     decode_path=decode_path,
3895                     offset=offset,
3896                 )
3897             try:
3898                 obj = self.__class__(
3899                     value=(
3900                         None if (evgen_mode and self.evgen_mode_skip_value)
3901                         else v.tobytes()
3902                     ),
3903                     bounds=(self._bound_min, self._bound_max),
3904                     impl=self.tag,
3905                     expl=self._expl,
3906                     default=self.default,
3907                     optional=self.optional,
3908                     _decoded=(offset, llen, l),
3909                     ctx=ctx,
3910                 )
3911             except DecodeError as err:
3912                 raise DecodeError(
3913                     msg=err.msg,
3914                     klass=self.__class__,
3915                     decode_path=decode_path,
3916                     offset=offset,
3917                 )
3918             except BoundsError as err:
3919                 raise DecodeError(
3920                     msg=str(err),
3921                     klass=self.__class__,
3922                     decode_path=decode_path,
3923                     offset=offset,
3924                 )
3925             yield decode_path, obj, tail
3926             return
3927         if t != self.tag_constructed:
3928             raise TagMismatch(
3929                 klass=self.__class__,
3930                 decode_path=decode_path,
3931                 offset=offset,
3932             )
3933         if not ctx.get("bered", False):
3934             raise DecodeError(
3935                 "unallowed BER constructed encoding",
3936                 klass=self.__class__,
3937                 decode_path=decode_path,
3938                 offset=offset,
3939             )
3940         if tag_only:
3941             yield None
3942             return
3943         lenindef = False
3944         try:
3945             l, llen, v = len_decode(lv)
3946         except LenIndefForm:
3947             llen, l, v = 1, 0, lv[1:]
3948             lenindef = True
3949         except DecodeError as err:
3950             raise err.__class__(
3951                 msg=err.msg,
3952                 klass=self.__class__,
3953                 decode_path=decode_path,
3954                 offset=offset,
3955             )
3956         if l > len(v):
3957             raise NotEnoughData(
3958                 "encoded length is longer than data",
3959                 klass=self.__class__,
3960                 decode_path=decode_path,
3961                 offset=offset,
3962             )
3963         chunks = []
3964         chunks_count = 0
3965         sub_offset = offset + tlen + llen
3966         vlen = 0
3967         payload_len = 0
3968         while True:
3969             if lenindef:
3970                 if v[:EOC_LEN].tobytes() == EOC:
3971                     break
3972             else:
3973                 if vlen == l:
3974                     break
3975                 if vlen > l:
3976                     raise DecodeError(
3977                         "chunk out of bounds",
3978                         klass=self.__class__,
3979                         decode_path=decode_path + (str(len(chunks) - 1),),
3980                         offset=chunks[-1].offset,
3981                     )
3982             try:
3983                 if evgen_mode:
3984                     sub_decode_path = decode_path + (str(chunks_count),)
3985                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3986                             v,
3987                             offset=sub_offset,
3988                             decode_path=sub_decode_path,
3989                             leavemm=True,
3990                             ctx=ctx,
3991                             _ctx_immutable=False,
3992                     ):
3993                         yield _decode_path, chunk, v_tail
3994                         if not chunk.ber_encoded:
3995                             payload_len += chunk.vlen
3996                     chunks_count += 1
3997                 else:
3998                     sub_decode_path = decode_path + (str(len(chunks)),)
3999                     _, chunk, v_tail = next(OctetString().decode_evgen(
4000                         v,
4001                         offset=sub_offset,
4002                         decode_path=sub_decode_path,
4003                         leavemm=True,
4004                         ctx=ctx,
4005                         _ctx_immutable=False,
4006                         _evgen_mode=False,
4007                     ))
4008                     chunks.append(chunk)
4009             except TagMismatch:
4010                 raise DecodeError(
4011                     "expected OctetString encoded chunk",
4012                     klass=self.__class__,
4013                     decode_path=sub_decode_path,
4014                     offset=sub_offset,
4015                 )
4016             sub_offset += chunk.tlvlen
4017             vlen += chunk.tlvlen
4018             v = v_tail
4019         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4020             raise DecodeError(
4021                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4022                 klass=self.__class__,
4023                 decode_path=decode_path,
4024                 offset=offset,
4025             )
4026         try:
4027             obj = self.__class__(
4028                 value=(
4029                     None if evgen_mode else
4030                     b"".join(bytes(chunk) for chunk in chunks)
4031                 ),
4032                 bounds=(self._bound_min, self._bound_max),
4033                 impl=self.tag,
4034                 expl=self._expl,
4035                 default=self.default,
4036                 optional=self.optional,
4037                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4038                 ctx=ctx,
4039             )
4040         except DecodeError as err:
4041             raise DecodeError(
4042                 msg=err.msg,
4043                 klass=self.__class__,
4044                 decode_path=decode_path,
4045                 offset=offset,
4046             )
4047         except BoundsError as err:
4048             raise DecodeError(
4049                 msg=str(err),
4050                 klass=self.__class__,
4051                 decode_path=decode_path,
4052                 offset=offset,
4053             )
4054         obj.lenindef = lenindef
4055         obj.ber_encoded = True
4056         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4057
4058     def __repr__(self):
4059         return pp_console_row(next(self.pps()))
4060
4061     def pps(self, decode_path=()):
4062         yield _pp(
4063             obj=self,
4064             asn1_type_name=self.asn1_type_name,
4065             obj_name=self.__class__.__name__,
4066             decode_path=decode_path,
4067             value=("%d bytes" % len(self._value)) if self.ready else None,
4068             blob=self._value if self.ready else None,
4069             optional=self.optional,
4070             default=self == self.default,
4071             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4072             expl=None if self._expl is None else tag_decode(self._expl),
4073             offset=self.offset,
4074             tlen=self.tlen,
4075             llen=self.llen,
4076             vlen=self.vlen,
4077             expl_offset=self.expl_offset if self.expled else None,
4078             expl_tlen=self.expl_tlen if self.expled else None,
4079             expl_llen=self.expl_llen if self.expled else None,
4080             expl_vlen=self.expl_vlen if self.expled else None,
4081             expl_lenindef=self.expl_lenindef,
4082             lenindef=self.lenindef,
4083             ber_encoded=self.ber_encoded,
4084             bered=self.bered,
4085         )
4086         defined_by, defined = self.defined or (None, None)
4087         if defined_by is not None:
4088             yield defined.pps(
4089                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4090             )
4091         for pp in self.pps_lenindef(decode_path):
4092             yield pp
4093
4094
4095 def agg_octet_string(evgens, decode_path, raw, writer):
4096     """Aggregate constructed string (OctetString and its derivatives)
4097
4098     :param evgens: iterator of generated events
4099     :param decode_path: points to the string we want to decode
4100     :param raw: slicebable (memoryview, bytearray, etc) with
4101                 the data evgens are generated on
4102     :param writer: buffer.write where string is going to be saved
4103     :param writer: where string is going to be saved. Must comply
4104                    with ``io.RawIOBase.write`` behaviour
4105
4106     .. seealso:: :ref:`agg_octet_string`
4107     """
4108     decode_path_len = len(decode_path)
4109     for dp, obj, _ in evgens:
4110         if dp[:decode_path_len] != decode_path:
4111             continue
4112         if not obj.ber_encoded:
4113             write_full(writer, raw[
4114                 obj.offset + obj.tlen + obj.llen:
4115                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4116                 (EOC_LEN if obj.expl_lenindef else 0)
4117             ])
4118         if len(dp) == decode_path_len:
4119             break
4120
4121
4122 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4123
4124
4125 class Null(Obj):
4126     """``NULL`` null object
4127
4128     >>> n = Null()
4129     NULL
4130     >>> n.ready
4131     True
4132     """
4133     __slots__ = ()
4134     tag_default = tag_encode(5)
4135     asn1_type_name = "NULL"
4136
4137     def __init__(
4138             self,
4139             value=None,  # unused, but Sequence passes it
4140             impl=None,
4141             expl=None,
4142             optional=False,
4143             _decoded=(0, 0, 0),
4144     ):
4145         """
4146         :param bytes impl: override default tag with ``IMPLICIT`` one
4147         :param bytes expl: override default tag with ``EXPLICIT`` one
4148         :param bool optional: is object ``OPTIONAL`` in sequence
4149         """
4150         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4151         self.default = None
4152
4153     @property
4154     def ready(self):
4155         return True
4156
4157     def __getstate__(self):
4158         return NullState(
4159             __version__,
4160             self.tag,
4161             self._tag_order,
4162             self._expl,
4163             self.default,
4164             self.optional,
4165             self.offset,
4166             self.llen,
4167             self.vlen,
4168             self.expl_lenindef,
4169             self.lenindef,
4170             self.ber_encoded,
4171         )
4172
4173     def __eq__(self, their):
4174         if not issubclass(their.__class__, Null):
4175             return False
4176         return (
4177             self.tag == their.tag and
4178             self._expl == their._expl
4179         )
4180
4181     def __call__(
4182             self,
4183             value=None,
4184             impl=None,
4185             expl=None,
4186             optional=None,
4187     ):
4188         return self.__class__(
4189             impl=self.tag if impl is None else impl,
4190             expl=self._expl if expl is None else expl,
4191             optional=self.optional if optional is None else optional,
4192         )
4193
4194     def _encode(self):
4195         return self.tag + LEN0
4196
4197     def _encode1st(self, state):
4198         return len(self.tag) + 1, state
4199
4200     def _encode2nd(self, writer, state_iter):
4201         write_full(writer, self.tag + LEN0)
4202
4203     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4204         try:
4205             t, _, lv = tag_strip(tlv)
4206         except DecodeError as err:
4207             raise err.__class__(
4208                 msg=err.msg,
4209                 klass=self.__class__,
4210                 decode_path=decode_path,
4211                 offset=offset,
4212             )
4213         if t != self.tag:
4214             raise TagMismatch(
4215                 klass=self.__class__,
4216                 decode_path=decode_path,
4217                 offset=offset,
4218             )
4219         if tag_only:  # pragma: no cover
4220             yield None
4221             return
4222         try:
4223             l, _, v = len_decode(lv)
4224         except DecodeError as err:
4225             raise err.__class__(
4226                 msg=err.msg,
4227                 klass=self.__class__,
4228                 decode_path=decode_path,
4229                 offset=offset,
4230             )
4231         if l != 0:
4232             raise InvalidLength(
4233                 "Null must have zero length",
4234                 klass=self.__class__,
4235                 decode_path=decode_path,
4236                 offset=offset,
4237             )
4238         obj = self.__class__(
4239             impl=self.tag,
4240             expl=self._expl,
4241             optional=self.optional,
4242             _decoded=(offset, 1, 0),
4243         )
4244         yield decode_path, obj, v
4245
4246     def __repr__(self):
4247         return pp_console_row(next(self.pps()))
4248
4249     def pps(self, decode_path=()):
4250         yield _pp(
4251             obj=self,
4252             asn1_type_name=self.asn1_type_name,
4253             obj_name=self.__class__.__name__,
4254             decode_path=decode_path,
4255             optional=self.optional,
4256             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4257             expl=None if self._expl is None else tag_decode(self._expl),
4258             offset=self.offset,
4259             tlen=self.tlen,
4260             llen=self.llen,
4261             vlen=self.vlen,
4262             expl_offset=self.expl_offset if self.expled else None,
4263             expl_tlen=self.expl_tlen if self.expled else None,
4264             expl_llen=self.expl_llen if self.expled else None,
4265             expl_vlen=self.expl_vlen if self.expled else None,
4266             expl_lenindef=self.expl_lenindef,
4267             bered=self.bered,
4268         )
4269         for pp in self.pps_lenindef(decode_path):
4270             yield pp
4271
4272
4273 ObjectIdentifierState = namedtuple(
4274     "ObjectIdentifierState",
4275     BasicState._fields + ("value", "defines"),
4276     **NAMEDTUPLE_KWARGS
4277 )
4278
4279
4280 class ObjectIdentifier(Obj):
4281     """``OBJECT IDENTIFIER`` OID type
4282
4283     >>> oid = ObjectIdentifier((1, 2, 3))
4284     OBJECT IDENTIFIER 1.2.3
4285     >>> oid == ObjectIdentifier("1.2.3")
4286     True
4287     >>> tuple(oid)
4288     (1, 2, 3)
4289     >>> str(oid)
4290     '1.2.3'
4291     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4292     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4293
4294     >>> str(ObjectIdentifier((3, 1)))
4295     Traceback (most recent call last):
4296     pyderasn.InvalidOID: unacceptable first arc value
4297     """
4298     __slots__ = ("defines",)
4299     tag_default = tag_encode(6)
4300     asn1_type_name = "OBJECT IDENTIFIER"
4301
4302     def __init__(
4303             self,
4304             value=None,
4305             defines=(),
4306             impl=None,
4307             expl=None,
4308             default=None,
4309             optional=False,
4310             _decoded=(0, 0, 0),
4311     ):
4312         """
4313         :param value: set the value. Either tuples of integers,
4314                       string of "."-concatenated integers, or
4315                       :py:class:`pyderasn.ObjectIdentifier` object
4316         :param defines: sequence of tuples. Each tuple has two elements.
4317                         First one is relative to current one decode
4318                         path, aiming to the field defined by that OID.
4319                         Read about relative path in
4320                         :py:func:`pyderasn.abs_decode_path`. Second
4321                         tuple element is ``{OID: pyderasn.Obj()}``
4322                         dictionary, mapping between current OID value
4323                         and structure applied to defined field.
4324
4325                         .. seealso:: :ref:`definedby`
4326
4327         :param bytes impl: override default tag with ``IMPLICIT`` one
4328         :param bytes expl: override default tag with ``EXPLICIT`` one
4329         :param default: set default value. Type same as in ``value``
4330         :param bool optional: is object ``OPTIONAL`` in sequence
4331         """
4332         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4333         self._value = value
4334         if value is not None:
4335             self._value = self._value_sanitize(value)
4336         if default is not None:
4337             default = self._value_sanitize(default)
4338             self.default = self.__class__(
4339                 value=default,
4340                 impl=self.tag,
4341                 expl=self._expl,
4342             )
4343             if self._value is None:
4344                 self._value = default
4345         self.defines = defines
4346
4347     def __add__(self, their):
4348         if their.__class__ == tuple:
4349             return self.__class__(self._value + array("L", their))
4350         if isinstance(their, self.__class__):
4351             return self.__class__(self._value + their._value)
4352         raise InvalidValueType((self.__class__, tuple))
4353
4354     def _value_sanitize(self, value):
4355         if issubclass(value.__class__, ObjectIdentifier):
4356             return value._value
4357         if isinstance(value, string_types):
4358             try:
4359                 value = array("L", (pureint(arc) for arc in value.split(".")))
4360             except ValueError:
4361                 raise InvalidOID("unacceptable arcs values")
4362         if value.__class__ == tuple:
4363             try:
4364                 value = array("L", value)
4365             except OverflowError as err:
4366                 raise InvalidOID(repr(err))
4367         if value.__class__ is array:
4368             if len(value) < 2:
4369                 raise InvalidOID("less than 2 arcs")
4370             first_arc = value[0]
4371             if first_arc in (0, 1):
4372                 if not (0 <= value[1] <= 39):
4373                     raise InvalidOID("second arc is too wide")
4374             elif first_arc == 2:
4375                 pass
4376             else:
4377                 raise InvalidOID("unacceptable first arc value")
4378             if not all(arc >= 0 for arc in value):
4379                 raise InvalidOID("negative arc value")
4380             return value
4381         raise InvalidValueType((self.__class__, str, tuple))
4382
4383     @property
4384     def ready(self):
4385         return self._value is not None
4386
4387     def __getstate__(self):
4388         return ObjectIdentifierState(
4389             __version__,
4390             self.tag,
4391             self._tag_order,
4392             self._expl,
4393             self.default,
4394             self.optional,
4395             self.offset,
4396             self.llen,
4397             self.vlen,
4398             self.expl_lenindef,
4399             self.lenindef,
4400             self.ber_encoded,
4401             self._value,
4402             self.defines,
4403         )
4404
4405     def __setstate__(self, state):
4406         super(ObjectIdentifier, self).__setstate__(state)
4407         self._value = state.value
4408         self.defines = state.defines
4409
4410     def __iter__(self):
4411         self._assert_ready()
4412         return iter(self._value)
4413
4414     def __str__(self):
4415         return ".".join(str(arc) for arc in self._value or ())
4416
4417     def __hash__(self):
4418         self._assert_ready()
4419         return hash(b"".join((
4420             self.tag,
4421             bytes(self._expl or b""),
4422             str(self._value).encode("ascii"),
4423         )))
4424
4425     def __eq__(self, their):
4426         if their.__class__ == tuple:
4427             return self._value == array("L", their)
4428         if not issubclass(their.__class__, ObjectIdentifier):
4429             return False
4430         return (
4431             self.tag == their.tag and
4432             self._expl == their._expl and
4433             self._value == their._value
4434         )
4435
4436     def __lt__(self, their):
4437         return self._value < their._value
4438
4439     def __call__(
4440             self,
4441             value=None,
4442             defines=None,
4443             impl=None,
4444             expl=None,
4445             default=None,
4446             optional=None,
4447     ):
4448         return self.__class__(
4449             value=value,
4450             defines=self.defines if defines is None else defines,
4451             impl=self.tag if impl is None else impl,
4452             expl=self._expl if expl is None else expl,
4453             default=self.default if default is None else default,
4454             optional=self.optional if optional is None else optional,
4455         )
4456
4457     def _encode_octets(self):
4458         self._assert_ready()
4459         value = self._value
4460         first_value = value[1]
4461         first_arc = value[0]
4462         if first_arc == 0:
4463             pass
4464         elif first_arc == 1:
4465             first_value += 40
4466         elif first_arc == 2:
4467             first_value += 80
4468         else:  # pragma: no cover
4469             raise RuntimeError("invalid arc is stored")
4470         octets = [zero_ended_encode(first_value)]
4471         for arc in value[2:]:
4472             octets.append(zero_ended_encode(arc))
4473         return b"".join(octets)
4474
4475     def _encode(self):
4476         v = self._encode_octets()
4477         return b"".join((self.tag, len_encode(len(v)), v))
4478
4479     def _encode1st(self, state):
4480         l = len(self._encode_octets())
4481         return len(self.tag) + len_size(l) + l, state
4482
4483     def _encode2nd(self, writer, state_iter):
4484         write_full(writer, self._encode())
4485
4486     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4487         try:
4488             t, _, lv = tag_strip(tlv)
4489         except DecodeError as err:
4490             raise err.__class__(
4491                 msg=err.msg,
4492                 klass=self.__class__,
4493                 decode_path=decode_path,
4494                 offset=offset,
4495             )
4496         if t != self.tag:
4497             raise TagMismatch(
4498                 klass=self.__class__,
4499                 decode_path=decode_path,
4500                 offset=offset,
4501             )
4502         if tag_only:  # pragma: no cover
4503             yield None
4504             return
4505         try:
4506             l, llen, v = len_decode(lv)
4507         except DecodeError as err:
4508             raise err.__class__(
4509                 msg=err.msg,
4510                 klass=self.__class__,
4511                 decode_path=decode_path,
4512                 offset=offset,
4513             )
4514         if l > len(v):
4515             raise NotEnoughData(
4516                 "encoded length is longer than data",
4517                 klass=self.__class__,
4518                 decode_path=decode_path,
4519                 offset=offset,
4520             )
4521         if l == 0:
4522             raise NotEnoughData(
4523                 "zero length",
4524                 klass=self.__class__,
4525                 decode_path=decode_path,
4526                 offset=offset,
4527             )
4528         v, tail = v[:l], v[l:]
4529         arcs = array("L")
4530         ber_encoded = False
4531         while len(v) > 0:
4532             i = 0
4533             arc = 0
4534             while True:
4535                 octet = indexbytes(v, i)
4536                 if i == 0 and octet == 0x80:
4537                     if ctx.get("bered", False):
4538                         ber_encoded = True
4539                     else:
4540                         raise DecodeError(
4541                             "non normalized arc encoding",
4542                             klass=self.__class__,
4543                             decode_path=decode_path,
4544                             offset=offset,
4545                         )
4546                 arc = (arc << 7) | (octet & 0x7F)
4547                 if octet & 0x80 == 0:
4548                     try:
4549                         arcs.append(arc)
4550                     except OverflowError:
4551                         raise DecodeError(
4552                             "too huge value for local unsigned long",
4553                             klass=self.__class__,
4554                             decode_path=decode_path,
4555                             offset=offset,
4556                         )
4557                     v = v[i + 1:]
4558                     break
4559                 i += 1
4560                 if i == len(v):
4561                     raise DecodeError(
4562                         "unfinished OID",
4563                         klass=self.__class__,
4564                         decode_path=decode_path,
4565                         offset=offset,
4566                     )
4567         first_arc = 0
4568         second_arc = arcs[0]
4569         if 0 <= second_arc <= 39:
4570             first_arc = 0
4571         elif 40 <= second_arc <= 79:
4572             first_arc = 1
4573             second_arc -= 40
4574         else:
4575             first_arc = 2
4576             second_arc -= 80
4577         obj = self.__class__(
4578             value=array("L", (first_arc, second_arc)) + arcs[1:],
4579             impl=self.tag,
4580             expl=self._expl,
4581             default=self.default,
4582             optional=self.optional,
4583             _decoded=(offset, llen, l),
4584         )
4585         if ber_encoded:
4586             obj.ber_encoded = True
4587         yield decode_path, obj, tail
4588
4589     def __repr__(self):
4590         return pp_console_row(next(self.pps()))
4591
4592     def pps(self, decode_path=()):
4593         yield _pp(
4594             obj=self,
4595             asn1_type_name=self.asn1_type_name,
4596             obj_name=self.__class__.__name__,
4597             decode_path=decode_path,
4598             value=str(self) if self.ready else None,
4599             optional=self.optional,
4600             default=self == self.default,
4601             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4602             expl=None if self._expl is None else tag_decode(self._expl),
4603             offset=self.offset,
4604             tlen=self.tlen,
4605             llen=self.llen,
4606             vlen=self.vlen,
4607             expl_offset=self.expl_offset if self.expled else None,
4608             expl_tlen=self.expl_tlen if self.expled else None,
4609             expl_llen=self.expl_llen if self.expled else None,
4610             expl_vlen=self.expl_vlen if self.expled else None,
4611             expl_lenindef=self.expl_lenindef,
4612             ber_encoded=self.ber_encoded,
4613             bered=self.bered,
4614         )
4615         for pp in self.pps_lenindef(decode_path):
4616             yield pp
4617
4618
4619 class Enumerated(Integer):
4620     """``ENUMERATED`` integer type
4621
4622     This type is identical to :py:class:`pyderasn.Integer`, but requires
4623     schema to be specified and does not accept values missing from it.
4624     """
4625     __slots__ = ()
4626     tag_default = tag_encode(10)
4627     asn1_type_name = "ENUMERATED"
4628
4629     def __init__(
4630             self,
4631             value=None,
4632             impl=None,
4633             expl=None,
4634             default=None,
4635             optional=False,
4636             _specs=None,
4637             _decoded=(0, 0, 0),
4638             bounds=None,  # dummy argument, workability for Integer.decode
4639     ):
4640         super(Enumerated, self).__init__(
4641             value, bounds, impl, expl, default, optional, _specs, _decoded,
4642         )
4643         if len(self.specs) == 0:
4644             raise ValueError("schema must be specified")
4645
4646     def _value_sanitize(self, value):
4647         if isinstance(value, self.__class__):
4648             value = value._value
4649         elif isinstance(value, integer_types):
4650             for _value in itervalues(self.specs):
4651                 if _value == value:
4652                     break
4653             else:
4654                 raise DecodeError(
4655                     "unknown integer value: %s" % value,
4656                     klass=self.__class__,
4657                 )
4658         elif isinstance(value, string_types):
4659             value = self.specs.get(value)
4660             if value is None:
4661                 raise ObjUnknown("integer value: %s" % value)
4662         else:
4663             raise InvalidValueType((self.__class__, int, str))
4664         return value
4665
4666     def __call__(
4667             self,
4668             value=None,
4669             impl=None,
4670             expl=None,
4671             default=None,
4672             optional=None,
4673             _specs=None,
4674     ):
4675         return self.__class__(
4676             value=value,
4677             impl=self.tag if impl is None else impl,
4678             expl=self._expl if expl is None else expl,
4679             default=self.default if default is None else default,
4680             optional=self.optional if optional is None else optional,
4681             _specs=self.specs,
4682         )
4683
4684
4685 def escape_control_unicode(c):
4686     if unicat(c)[0] == "C":
4687         c = repr(c).lstrip("u").strip("'")
4688     return c
4689
4690
4691 class CommonString(OctetString):
4692     """Common class for all strings
4693
4694     Everything resembles :py:class:`pyderasn.OctetString`, except
4695     ability to deal with unicode text strings.
4696
4697     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4698     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4699     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4700     True
4701     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4702     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4703     >>> str(s)
4704     'привет Ð¼Ð¸Ñ€'
4705     >>> hexenc(bytes(s))
4706     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4707
4708     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4709     Traceback (most recent call last):
4710     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4711
4712     >>> BMPString("ада", bounds=(2, 2))
4713     Traceback (most recent call last):
4714     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4715     >>> s = BMPString("ад", bounds=(2, 2))
4716     >>> s.encoding
4717     'utf-16-be'
4718     >>> hexenc(bytes(s))
4719     '04300434'
4720
4721     .. list-table::
4722        :header-rows: 1
4723
4724        * - Class
4725          - Text Encoding, validation
4726        * - :py:class:`pyderasn.UTF8String`
4727          - utf-8
4728        * - :py:class:`pyderasn.NumericString`
4729          - proper alphabet validation
4730        * - :py:class:`pyderasn.PrintableString`
4731          - proper alphabet validation
4732        * - :py:class:`pyderasn.TeletexString`
4733          - iso-8859-1
4734        * - :py:class:`pyderasn.T61String`
4735          - iso-8859-1
4736        * - :py:class:`pyderasn.VideotexString`
4737          - iso-8859-1
4738        * - :py:class:`pyderasn.IA5String`
4739          - proper alphabet validation
4740        * - :py:class:`pyderasn.GraphicString`
4741          - iso-8859-1
4742        * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4743          - proper alphabet validation
4744        * - :py:class:`pyderasn.GeneralString`
4745          - iso-8859-1
4746        * - :py:class:`pyderasn.UniversalString`
4747          - utf-32-be
4748        * - :py:class:`pyderasn.BMPString`
4749          - utf-16-be
4750     """
4751     __slots__ = ()
4752
4753     def _value_sanitize(self, value):
4754         value_raw = None
4755         value_decoded = None
4756         if isinstance(value, self.__class__):
4757             value_raw = value._value
4758         elif value.__class__ == text_type:
4759             value_decoded = value
4760         elif value.__class__ == binary_type:
4761             value_raw = value
4762         else:
4763             raise InvalidValueType((self.__class__, text_type, binary_type))
4764         try:
4765             value_raw = (
4766                 value_decoded.encode(self.encoding)
4767                 if value_raw is None else value_raw
4768             )
4769             value_decoded = (
4770                 value_raw.decode(self.encoding)
4771                 if value_decoded is None else value_decoded
4772             )
4773         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4774             raise DecodeError(str(err))
4775         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4776             raise BoundsError(
4777                 self._bound_min,
4778                 len(value_decoded),
4779                 self._bound_max,
4780             )
4781         return value_raw
4782
4783     def __eq__(self, their):
4784         if their.__class__ == binary_type:
4785             return self._value == their
4786         if their.__class__ == text_type:
4787             return self._value == their.encode(self.encoding)
4788         if not isinstance(their, self.__class__):
4789             return False
4790         return (
4791             self._value == their._value and
4792             self.tag == their.tag and
4793             self._expl == their._expl
4794         )
4795
4796     def __unicode__(self):
4797         if self.ready:
4798             return self._value.decode(self.encoding)
4799         return text_type(self._value)
4800
4801     def __repr__(self):
4802         return pp_console_row(next(self.pps(no_unicode=PY2)))
4803
4804     def pps(self, decode_path=(), no_unicode=False):
4805         value = None
4806         if self.ready:
4807             value = (
4808                 hexenc(bytes(self)) if no_unicode else
4809                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4810             )
4811         yield _pp(
4812             obj=self,
4813             asn1_type_name=self.asn1_type_name,
4814             obj_name=self.__class__.__name__,
4815             decode_path=decode_path,
4816             value=value,
4817             optional=self.optional,
4818             default=self == self.default,
4819             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4820             expl=None if self._expl is None else tag_decode(self._expl),
4821             offset=self.offset,
4822             tlen=self.tlen,
4823             llen=self.llen,
4824             vlen=self.vlen,
4825             expl_offset=self.expl_offset if self.expled else None,
4826             expl_tlen=self.expl_tlen if self.expled else None,
4827             expl_llen=self.expl_llen if self.expled else None,
4828             expl_vlen=self.expl_vlen if self.expled else None,
4829             expl_lenindef=self.expl_lenindef,
4830             ber_encoded=self.ber_encoded,
4831             bered=self.bered,
4832         )
4833         for pp in self.pps_lenindef(decode_path):
4834             yield pp
4835
4836
4837 class UTF8String(CommonString):
4838     __slots__ = ()
4839     tag_default = tag_encode(12)
4840     encoding = "utf-8"
4841     asn1_type_name = "UTF8String"
4842
4843
4844 class AllowableCharsMixin(object):
4845     @property
4846     def allowable_chars(self):
4847         if PY2:
4848             return self._allowable_chars
4849         return frozenset(six_unichr(c) for c in self._allowable_chars)
4850
4851     def _value_sanitize(self, value):
4852         value = super(AllowableCharsMixin, self)._value_sanitize(value)
4853         if not frozenset(value) <= self._allowable_chars:
4854             raise DecodeError("non satisfying alphabet value")
4855         return value
4856
4857
4858 class NumericString(AllowableCharsMixin, CommonString):
4859     """Numeric string
4860
4861     Its value is properly sanitized: only ASCII digits with spaces can
4862     be stored.
4863
4864     >>> NumericString().allowable_chars
4865     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4866     """
4867     __slots__ = ()
4868     tag_default = tag_encode(18)
4869     encoding = "ascii"
4870     asn1_type_name = "NumericString"
4871     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4872
4873
4874 PrintableStringState = namedtuple(
4875     "PrintableStringState",
4876     OctetStringState._fields + ("allowable_chars",),
4877     **NAMEDTUPLE_KWARGS
4878 )
4879
4880
4881 class PrintableString(AllowableCharsMixin, CommonString):
4882     """Printable string
4883
4884     Its value is properly sanitized: see X.680 41.4 table 10.
4885
4886     >>> PrintableString().allowable_chars
4887     frozenset([' ', "'", ..., 'z'])
4888     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4889     PrintableString PrintableString foo*bar
4890     >>> obj.allow_asterisk, obj.allow_ampersand
4891     (True, False)
4892     """
4893     __slots__ = ()
4894     tag_default = tag_encode(19)
4895     encoding = "ascii"
4896     asn1_type_name = "PrintableString"
4897     _allowable_chars = frozenset(
4898         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4899     )
4900     _asterisk = frozenset("*".encode("ascii"))
4901     _ampersand = frozenset("&".encode("ascii"))
4902
4903     def __init__(
4904             self,
4905             value=None,
4906             bounds=None,
4907             impl=None,
4908             expl=None,
4909             default=None,
4910             optional=False,
4911             _decoded=(0, 0, 0),
4912             ctx=None,
4913             allow_asterisk=False,
4914             allow_ampersand=False,
4915     ):
4916         """
4917         :param allow_asterisk: allow asterisk character
4918         :param allow_ampersand: allow ampersand character
4919         """
4920         if allow_asterisk:
4921             self._allowable_chars |= self._asterisk
4922         if allow_ampersand:
4923             self._allowable_chars |= self._ampersand
4924         super(PrintableString, self).__init__(
4925             value, bounds, impl, expl, default, optional, _decoded, ctx,
4926         )
4927
4928     @property
4929     def allow_asterisk(self):
4930         """Is asterisk character allowed?
4931         """
4932         return self._asterisk <= self._allowable_chars
4933
4934     @property
4935     def allow_ampersand(self):
4936         """Is ampersand character allowed?
4937         """
4938         return self._ampersand <= self._allowable_chars
4939
4940     def __getstate__(self):
4941         return PrintableStringState(
4942             *super(PrintableString, self).__getstate__(),
4943             **{"allowable_chars": self._allowable_chars}
4944         )
4945
4946     def __setstate__(self, state):
4947         super(PrintableString, self).__setstate__(state)
4948         self._allowable_chars = state.allowable_chars
4949
4950     def __call__(
4951             self,
4952             value=None,
4953             bounds=None,
4954             impl=None,
4955             expl=None,
4956             default=None,
4957             optional=None,
4958     ):
4959         return self.__class__(
4960             value=value,
4961             bounds=(
4962                 (self._bound_min, self._bound_max)
4963                 if bounds is None else bounds
4964             ),
4965             impl=self.tag if impl is None else impl,
4966             expl=self._expl if expl is None else expl,
4967             default=self.default if default is None else default,
4968             optional=self.optional if optional is None else optional,
4969             allow_asterisk=self.allow_asterisk,
4970             allow_ampersand=self.allow_ampersand,
4971         )
4972
4973
4974 class TeletexString(CommonString):
4975     __slots__ = ()
4976     tag_default = tag_encode(20)
4977     encoding = "iso-8859-1"
4978     asn1_type_name = "TeletexString"
4979
4980
4981 class T61String(TeletexString):
4982     __slots__ = ()
4983     asn1_type_name = "T61String"
4984
4985
4986 class VideotexString(CommonString):
4987     __slots__ = ()
4988     tag_default = tag_encode(21)
4989     encoding = "iso-8859-1"
4990     asn1_type_name = "VideotexString"
4991
4992
4993 class IA5String(AllowableCharsMixin, CommonString):
4994     """IA5 string
4995
4996     Its value is properly sanitized: it is a mix of
4997
4998     * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4999     * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
5000     * DEL character (0x7F)
5001
5002     It is just 7-bit ASCII.
5003
5004     >>> IA5String().allowable_chars
5005     frozenset(["NUL", ... "DEL"])
5006     """
5007     __slots__ = ()
5008     tag_default = tag_encode(22)
5009     encoding = "ascii"
5010     asn1_type_name = "IA5"
5011     _allowable_chars = frozenset(b"".join(
5012         six_unichr(c).encode("ascii") for c in six_xrange(128)
5013     ))
5014
5015
5016 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5017 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5018 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5019 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5020 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5021 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5022
5023
5024 class VisibleString(AllowableCharsMixin, CommonString):
5025     """Visible string
5026
5027     Its value is properly sanitized. ASCII subset from space to tilde is
5028     allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5029
5030     >>> VisibleString().allowable_chars
5031     frozenset([" ", ... "~"])
5032     """
5033     __slots__ = ()
5034     tag_default = tag_encode(26)
5035     encoding = "ascii"
5036     asn1_type_name = "VisibleString"
5037     _allowable_chars = frozenset(b"".join(
5038         six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5039     ))
5040
5041
5042 class ISO646String(VisibleString):
5043     __slots__ = ()
5044     asn1_type_name = "ISO646String"
5045
5046
5047 UTCTimeState = namedtuple(
5048     "UTCTimeState",
5049     OctetStringState._fields + ("ber_raw",),
5050     **NAMEDTUPLE_KWARGS
5051 )
5052
5053
5054 def str_to_time_fractions(value):
5055     v = pureint(value)
5056     year, v = (v // 10**10), (v % 10**10)
5057     month, v = (v // 10**8), (v % 10**8)
5058     day, v = (v // 10**6), (v % 10**6)
5059     hour, v = (v // 10**4), (v % 10**4)
5060     minute, second = (v // 100), (v % 100)
5061     return year, month, day, hour, minute, second
5062
5063
5064 class UTCTime(VisibleString):
5065     """``UTCTime`` datetime type
5066
5067     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5068     UTCTime UTCTime 2017-09-30T22:07:50
5069     >>> str(t)
5070     '170930220750Z'
5071     >>> bytes(t)
5072     b'170930220750Z'
5073     >>> t.todatetime()
5074     datetime.datetime(2017, 9, 30, 22, 7, 50)
5075     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5076     datetime.datetime(1957, 9, 30, 22, 7, 50)
5077
5078     If BER encoded value was met, then ``ber_raw`` attribute will hold
5079     its raw representation.
5080
5081     .. warning::
5082
5083        Pay attention that UTCTime can not hold full year, so all years
5084        having < 50 years are treated as 20xx, 19xx otherwise, according
5085        to X.509 recommendation.
5086
5087     .. warning::
5088
5089        No strict validation of UTC offsets are made, but very crude:
5090
5091        * minutes are not exceeding 60
5092        * offset value is not exceeding 14 hours
5093     """
5094     __slots__ = ("ber_raw",)
5095     tag_default = tag_encode(23)
5096     encoding = "ascii"
5097     asn1_type_name = "UTCTime"
5098     evgen_mode_skip_value = False
5099
5100     def __init__(
5101             self,
5102             value=None,
5103             impl=None,
5104             expl=None,
5105             default=None,
5106             optional=False,
5107             _decoded=(0, 0, 0),
5108             bounds=None,  # dummy argument, workability for OctetString.decode
5109             ctx=None,
5110     ):
5111         """
5112         :param value: set the value. Either datetime type, or
5113                       :py:class:`pyderasn.UTCTime` object
5114         :param bytes impl: override default tag with ``IMPLICIT`` one
5115         :param bytes expl: override default tag with ``EXPLICIT`` one
5116         :param default: set default value. Type same as in ``value``
5117         :param bool optional: is object ``OPTIONAL`` in sequence
5118         """
5119         super(UTCTime, self).__init__(
5120             None, None, impl, expl, None, optional, _decoded, ctx,
5121         )
5122         self._value = value
5123         self.ber_raw = None
5124         if value is not None:
5125             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5126             self.ber_encoded = self.ber_raw is not None
5127         if default is not None:
5128             default, _ = self._value_sanitize(default)
5129             self.default = self.__class__(
5130                 value=default,
5131                 impl=self.tag,
5132                 expl=self._expl,
5133             )
5134             if self._value is None:
5135                 self._value = default
5136             optional = True
5137         self.optional = optional
5138
5139     def _strptime_bered(self, value):
5140         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5141         value = value[10:]
5142         if len(value) == 0:
5143             raise ValueError("no timezone")
5144         year += 2000 if year < 50 else 1900
5145         decoded = datetime(year, month, day, hour, minute)
5146         offset = 0
5147         if value[-1] == "Z":
5148             value = value[:-1]
5149         else:
5150             if len(value) < 5:
5151                 raise ValueError("invalid UTC offset")
5152             if value[-5] == "-":
5153                 sign = -1
5154             elif value[-5] == "+":
5155                 sign = 1
5156             else:
5157                 raise ValueError("invalid UTC offset")
5158             v = pureint(value[-4:])
5159             offset, v = (60 * (v % 100)), v // 100
5160             if offset >= 3600:
5161                 raise ValueError("invalid UTC offset minutes")
5162             offset += 3600 * v
5163             if offset > 14 * 3600:
5164                 raise ValueError("too big UTC offset")
5165             offset *= sign
5166             value = value[:-5]
5167         if len(value) == 0:
5168             return offset, decoded
5169         if len(value) != 2:
5170             raise ValueError("invalid UTC offset seconds")
5171         seconds = pureint(value)
5172         if seconds >= 60:
5173             raise ValueError("invalid seconds value")
5174         return offset, decoded + timedelta(seconds=seconds)
5175
5176     def _strptime(self, value):
5177         # datetime.strptime's format: %y%m%d%H%M%SZ
5178         if len(value) != LEN_YYMMDDHHMMSSZ:
5179             raise ValueError("invalid UTCTime length")
5180         if value[-1] != "Z":
5181             raise ValueError("non UTC timezone")
5182         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5183         year += 2000 if year < 50 else 1900
5184         return datetime(year, month, day, hour, minute, second)
5185
5186     def _dt_sanitize(self, value):
5187         if value.year < 1950 or value.year > 2049:
5188             raise ValueError("UTCTime can hold only 1950-2049 years")
5189         return value.replace(microsecond=0)
5190
5191     def _value_sanitize(self, value, ctx=None):
5192         if value.__class__ == binary_type:
5193             try:
5194                 value_decoded = value.decode("ascii")
5195             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5196                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5197             err = None
5198             try:
5199                 return self._strptime(value_decoded), None
5200             except (TypeError, ValueError) as _err:
5201                 err = _err
5202                 if (ctx is not None) and ctx.get("bered", False):
5203                     try:
5204                         offset, _value = self._strptime_bered(value_decoded)
5205                         _value = _value - timedelta(seconds=offset)
5206                         return self._dt_sanitize(_value), value
5207                     except (TypeError, ValueError, OverflowError) as _err:
5208                         err = _err
5209             raise DecodeError(
5210                 "invalid %s format: %r" % (self.asn1_type_name, err),
5211                 klass=self.__class__,
5212             )
5213         if isinstance(value, self.__class__):
5214             return value._value, None
5215         if value.__class__ == datetime:
5216             return self._dt_sanitize(value), None
5217         raise InvalidValueType((self.__class__, datetime))
5218
5219     def _pp_value(self):
5220         if self.ready:
5221             value = self._value.isoformat()
5222             if self.ber_encoded:
5223                 value += " (%s)" % self.ber_raw
5224             return value
5225         return None
5226
5227     def __unicode__(self):
5228         if self.ready:
5229             value = self._value.isoformat()
5230             if self.ber_encoded:
5231                 value += " (%s)" % self.ber_raw
5232             return value
5233         return text_type(self._pp_value())
5234
5235     def __getstate__(self):
5236         return UTCTimeState(
5237             *super(UTCTime, self).__getstate__(),
5238             **{"ber_raw": self.ber_raw}
5239         )
5240
5241     def __setstate__(self, state):
5242         super(UTCTime, self).__setstate__(state)
5243         self.ber_raw = state.ber_raw
5244
5245     def __bytes__(self):
5246         self._assert_ready()
5247         return self._encode_time()
5248
5249     def __eq__(self, their):
5250         if their.__class__ == binary_type:
5251             return self._encode_time() == their
5252         if their.__class__ == datetime:
5253             return self.todatetime() == their
5254         if not isinstance(their, self.__class__):
5255             return False
5256         return (
5257             self._value == their._value and
5258             self.tag == their.tag and
5259             self._expl == their._expl
5260         )
5261
5262     def _encode_time(self):
5263         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5264
5265     def _encode(self):
5266         self._assert_ready()
5267         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5268
5269     def _encode1st(self, state):
5270         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5271
5272     def _encode2nd(self, writer, state_iter):
5273         self._assert_ready()
5274         write_full(writer, self._encode())
5275
5276     def _encode_cer(self, writer):
5277         write_full(writer, self._encode())
5278
5279     def todatetime(self):
5280         return self._value
5281
5282     def __repr__(self):
5283         return pp_console_row(next(self.pps()))
5284
5285     def pps(self, decode_path=()):
5286         yield _pp(
5287             obj=self,
5288             asn1_type_name=self.asn1_type_name,
5289             obj_name=self.__class__.__name__,
5290             decode_path=decode_path,
5291             value=self._pp_value(),
5292             optional=self.optional,
5293             default=self == self.default,
5294             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5295             expl=None if self._expl is None else tag_decode(self._expl),
5296             offset=self.offset,
5297             tlen=self.tlen,
5298             llen=self.llen,
5299             vlen=self.vlen,
5300             expl_offset=self.expl_offset if self.expled else None,
5301             expl_tlen=self.expl_tlen if self.expled else None,
5302             expl_llen=self.expl_llen if self.expled else None,
5303             expl_vlen=self.expl_vlen if self.expled else None,
5304             expl_lenindef=self.expl_lenindef,
5305             ber_encoded=self.ber_encoded,
5306             bered=self.bered,
5307         )
5308         for pp in self.pps_lenindef(decode_path):
5309             yield pp
5310
5311
5312 class GeneralizedTime(UTCTime):
5313     """``GeneralizedTime`` datetime type
5314
5315     This type is similar to :py:class:`pyderasn.UTCTime`.
5316
5317     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5318     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5319     >>> str(t)
5320     '20170930220750.000123Z'
5321     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5322     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5323
5324     .. warning::
5325
5326        Only microsecond fractions are supported in DER encoding.
5327        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5328        higher precision values.
5329
5330     .. warning::
5331
5332        BER encoded data can loss information (accuracy) during decoding
5333        because of float transformations.
5334
5335     .. warning::
5336
5337        Local times (without explicit timezone specification) are treated
5338        as UTC one, no transformations are made.
5339
5340     .. warning::
5341
5342        Zero year is unsupported.
5343     """
5344     __slots__ = ()
5345     tag_default = tag_encode(24)
5346     asn1_type_name = "GeneralizedTime"
5347
5348     def _dt_sanitize(self, value):
5349         return value
5350
5351     def _strptime_bered(self, value):
5352         if len(value) < 4 + 3 * 2:
5353             raise ValueError("invalid GeneralizedTime")
5354         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5355         decoded = datetime(year, month, day, hour)
5356         offset, value = 0, value[10:]
5357         if len(value) == 0:
5358             return offset, decoded
5359         if value[-1] == "Z":
5360             value = value[:-1]
5361         else:
5362             for char, sign in (("-", -1), ("+", 1)):
5363                 idx = value.rfind(char)
5364                 if idx == -1:
5365                     continue
5366                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5367                 v = pureint(offset_raw)
5368                 if len(offset_raw) == 4:
5369                     offset, v = (60 * (v % 100)), v // 100
5370                     if offset >= 3600:
5371                         raise ValueError("invalid UTC offset minutes")
5372                 elif len(offset_raw) == 2:
5373                     pass
5374                 else:
5375                     raise ValueError("invalid UTC offset")
5376                 offset += 3600 * v
5377                 if offset > 14 * 3600:
5378                     raise ValueError("too big UTC offset")
5379                 offset *= sign
5380                 break
5381         if len(value) == 0:
5382             return offset, decoded
5383         if value[0] in DECIMAL_SIGNS:
5384             return offset, (
5385                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5386             )
5387         if len(value) < 2:
5388             raise ValueError("stripped minutes")
5389         decoded += timedelta(seconds=60 * pureint(value[:2]))
5390         value = value[2:]
5391         if len(value) == 0:
5392             return offset, decoded
5393         if value[0] in DECIMAL_SIGNS:
5394             return offset, (
5395                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5396             )
5397         if len(value) < 2:
5398             raise ValueError("stripped seconds")
5399         decoded += timedelta(seconds=pureint(value[:2]))
5400         value = value[2:]
5401         if len(value) == 0:
5402             return offset, decoded
5403         if value[0] not in DECIMAL_SIGNS:
5404             raise ValueError("invalid format after seconds")
5405         return offset, (
5406             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5407         )
5408
5409     def _strptime(self, value):
5410         l = len(value)
5411         if l == LEN_YYYYMMDDHHMMSSZ:
5412             # datetime.strptime's format: %Y%m%d%H%M%SZ
5413             if value[-1] != "Z":
5414                 raise ValueError("non UTC timezone")
5415             return datetime(*str_to_time_fractions(value[:-1]))
5416         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5417             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5418             if value[-1] != "Z":
5419                 raise ValueError("non UTC timezone")
5420             if value[14] != ".":
5421                 raise ValueError("no fractions separator")
5422             us = value[15:-1]
5423             if us[-1] == "0":
5424                 raise ValueError("trailing zero")
5425             us_len = len(us)
5426             if us_len > 6:
5427                 raise ValueError("only microsecond fractions are supported")
5428             us = pureint(us + ("0" * (6 - us_len)))
5429             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5430             return datetime(year, month, day, hour, minute, second, us)
5431         raise ValueError("invalid GeneralizedTime length")
5432
5433     def _encode_time(self):
5434         value = self._value
5435         encoded = value.strftime("%Y%m%d%H%M%S")
5436         if value.microsecond > 0:
5437             encoded += (".%06d" % value.microsecond).rstrip("0")
5438         return (encoded + "Z").encode("ascii")
5439
5440     def _encode(self):
5441         self._assert_ready()
5442         value = self._value
5443         if value.microsecond > 0:
5444             encoded = self._encode_time()
5445             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5446         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5447
5448     def _encode1st(self, state):
5449         self._assert_ready()
5450         vlen = len(self._encode_time())
5451         return len(self.tag) + len_size(vlen) + vlen, state
5452
5453     def _encode2nd(self, writer, state_iter):
5454         write_full(writer, self._encode())
5455
5456
5457 class GraphicString(CommonString):
5458     __slots__ = ()
5459     tag_default = tag_encode(25)
5460     encoding = "iso-8859-1"
5461     asn1_type_name = "GraphicString"
5462
5463
5464 class GeneralString(CommonString):
5465     __slots__ = ()
5466     tag_default = tag_encode(27)
5467     encoding = "iso-8859-1"
5468     asn1_type_name = "GeneralString"
5469
5470
5471 class UniversalString(CommonString):
5472     __slots__ = ()
5473     tag_default = tag_encode(28)
5474     encoding = "utf-32-be"
5475     asn1_type_name = "UniversalString"
5476
5477
5478 class BMPString(CommonString):
5479     __slots__ = ()
5480     tag_default = tag_encode(30)
5481     encoding = "utf-16-be"
5482     asn1_type_name = "BMPString"
5483
5484
5485 ChoiceState = namedtuple(
5486     "ChoiceState",
5487     BasicState._fields + ("specs", "value",),
5488     **NAMEDTUPLE_KWARGS
5489 )
5490
5491
5492 class Choice(Obj):
5493     """``CHOICE`` special type
5494
5495     ::
5496
5497         class GeneralName(Choice):
5498             schema = (
5499                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5500                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5501             )
5502
5503     >>> gn = GeneralName()
5504     GeneralName CHOICE
5505     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5506     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5507     >>> gn["dNSName"] = IA5String("bar.baz")
5508     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5509     >>> gn["rfc822Name"]
5510     None
5511     >>> gn["dNSName"]
5512     [2] IA5String IA5 bar.baz
5513     >>> gn.choice
5514     'dNSName'
5515     >>> gn.value == gn["dNSName"]
5516     True
5517     >>> gn.specs
5518     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5519
5520     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5521     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5522     """
5523     __slots__ = ("specs",)
5524     tag_default = None
5525     asn1_type_name = "CHOICE"
5526
5527     def __init__(
5528             self,
5529             value=None,
5530             schema=None,
5531             impl=None,
5532             expl=None,
5533             default=None,
5534             optional=False,
5535             _decoded=(0, 0, 0),
5536     ):
5537         """
5538         :param value: set the value. Either ``(choice, value)`` tuple, or
5539                       :py:class:`pyderasn.Choice` object
5540         :param bytes impl: can not be set, do **not** use it
5541         :param bytes expl: override default tag with ``EXPLICIT`` one
5542         :param default: set default value. Type same as in ``value``
5543         :param bool optional: is object ``OPTIONAL`` in sequence
5544         """
5545         if impl is not None:
5546             raise ValueError("no implicit tag allowed for CHOICE")
5547         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5548         if schema is None:
5549             schema = getattr(self, "schema", ())
5550         if len(schema) == 0:
5551             raise ValueError("schema must be specified")
5552         self.specs = (
5553             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5554         )
5555         self._value = None
5556         if value is not None:
5557             self._value = self._value_sanitize(value)
5558         if default is not None:
5559             default_value = self._value_sanitize(default)
5560             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5561             default_obj.specs = self.specs
5562             default_obj._value = default_value
5563             self.default = default_obj
5564             if value is None:
5565                 self._value = copy(default_obj._value)
5566         if self._expl is not None:
5567             tag_class, _, tag_num = tag_decode(self._expl)
5568             self._tag_order = (tag_class, tag_num)
5569
5570     def _value_sanitize(self, value):
5571         if (value.__class__ == tuple) and len(value) == 2:
5572             choice, obj = value
5573             spec = self.specs.get(choice)
5574             if spec is None:
5575                 raise ObjUnknown(choice)
5576             if not isinstance(obj, spec.__class__):
5577                 raise InvalidValueType((spec,))
5578             return (choice, spec(obj))
5579         if isinstance(value, self.__class__):
5580             return value._value
5581         raise InvalidValueType((self.__class__, tuple))
5582
5583     @property
5584     def ready(self):
5585         return self._value is not None and self._value[1].ready
5586
5587     @property
5588     def bered(self):
5589         return self.expl_lenindef or (
5590             (self._value is not None) and
5591             self._value[1].bered
5592         )
5593
5594     def __getstate__(self):
5595         return ChoiceState(
5596             __version__,
5597             self.tag,
5598             self._tag_order,
5599             self._expl,
5600             self.default,
5601             self.optional,
5602             self.offset,
5603             self.llen,
5604             self.vlen,
5605             self.expl_lenindef,
5606             self.lenindef,
5607             self.ber_encoded,
5608             self.specs,
5609             copy(self._value),
5610         )
5611
5612     def __setstate__(self, state):
5613         super(Choice, self).__setstate__(state)
5614         self.specs = state.specs
5615         self._value = state.value
5616
5617     def __eq__(self, their):
5618         if (their.__class__ == tuple) and len(their) == 2:
5619             return self._value == their
5620         if not isinstance(their, self.__class__):
5621             return False
5622         return (
5623             self.specs == their.specs and
5624             self._value == their._value
5625         )
5626
5627     def __call__(
5628             self,
5629             value=None,
5630             expl=None,
5631             default=None,
5632             optional=None,
5633     ):
5634         return self.__class__(
5635             value=value,
5636             schema=self.specs,
5637             expl=self._expl if expl is None else expl,
5638             default=self.default if default is None else default,
5639             optional=self.optional if optional is None else optional,
5640         )
5641
5642     @property
5643     def choice(self):
5644         """Name of the choice
5645         """
5646         self._assert_ready()
5647         return self._value[0]
5648
5649     @property
5650     def value(self):
5651         """Value of underlying choice
5652         """
5653         self._assert_ready()
5654         return self._value[1]
5655
5656     @property
5657     def tag_order(self):
5658         self._assert_ready()
5659         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5660
5661     @property
5662     def tag_order_cer(self):
5663         return min(v.tag_order_cer for v in itervalues(self.specs))
5664
5665     def __getitem__(self, key):
5666         if key not in self.specs:
5667             raise ObjUnknown(key)
5668         if self._value is None:
5669             return None
5670         choice, value = self._value
5671         if choice != key:
5672             return None
5673         return value
5674
5675     def __setitem__(self, key, value):
5676         spec = self.specs.get(key)
5677         if spec is None:
5678             raise ObjUnknown(key)
5679         if not isinstance(value, spec.__class__):
5680             raise InvalidValueType((spec.__class__,))
5681         self._value = (key, spec(value))
5682
5683     @property
5684     def tlen(self):
5685         return 0
5686
5687     @property
5688     def decoded(self):
5689         return self._value[1].decoded if self.ready else False
5690
5691     def _encode(self):
5692         self._assert_ready()
5693         return self._value[1].encode()
5694
5695     def _encode1st(self, state):
5696         self._assert_ready()
5697         return self._value[1].encode1st(state)
5698
5699     def _encode2nd(self, writer, state_iter):
5700         self._value[1].encode2nd(writer, state_iter)
5701
5702     def _encode_cer(self, writer):
5703         self._assert_ready()
5704         self._value[1].encode_cer(writer)
5705
5706     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5707         for choice, spec in iteritems(self.specs):
5708             sub_decode_path = decode_path + (choice,)
5709             try:
5710                 spec.decode(
5711                     tlv,
5712                     offset=offset,
5713                     leavemm=True,
5714                     decode_path=sub_decode_path,
5715                     ctx=ctx,
5716                     tag_only=True,
5717                     _ctx_immutable=False,
5718                 )
5719             except TagMismatch:
5720                 continue
5721             break
5722         else:
5723             raise TagMismatch(
5724                 klass=self.__class__,
5725                 decode_path=decode_path,
5726                 offset=offset,
5727             )
5728         if tag_only:  # pragma: no cover
5729             yield None
5730             return
5731         if evgen_mode:
5732             for _decode_path, value, tail in spec.decode_evgen(
5733                     tlv,
5734                     offset=offset,
5735                     leavemm=True,
5736                     decode_path=sub_decode_path,
5737                     ctx=ctx,
5738                     _ctx_immutable=False,
5739             ):
5740                 yield _decode_path, value, tail
5741         else:
5742             _, value, tail = next(spec.decode_evgen(
5743                 tlv,
5744                 offset=offset,
5745                 leavemm=True,
5746                 decode_path=sub_decode_path,
5747                 ctx=ctx,
5748                 _ctx_immutable=False,
5749                 _evgen_mode=False,
5750             ))
5751         obj = self.__class__(
5752             schema=self.specs,
5753             expl=self._expl,
5754             default=self.default,
5755             optional=self.optional,
5756             _decoded=(offset, 0, value.fulllen),
5757         )
5758         obj._value = (choice, value)
5759         yield decode_path, obj, tail
5760
5761     def __repr__(self):
5762         value = pp_console_row(next(self.pps()))
5763         if self.ready:
5764             value = "%s[%r]" % (value, self.value)
5765         return value
5766
5767     def pps(self, decode_path=()):
5768         yield _pp(
5769             obj=self,
5770             asn1_type_name=self.asn1_type_name,
5771             obj_name=self.__class__.__name__,
5772             decode_path=decode_path,
5773             value=self.choice if self.ready else None,
5774             optional=self.optional,
5775             default=self == self.default,
5776             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5777             expl=None if self._expl is None else tag_decode(self._expl),
5778             offset=self.offset,
5779             tlen=self.tlen,
5780             llen=self.llen,
5781             vlen=self.vlen,
5782             expl_lenindef=self.expl_lenindef,
5783             bered=self.bered,
5784         )
5785         if self.ready:
5786             yield self.value.pps(decode_path=decode_path + (self.choice,))
5787         for pp in self.pps_lenindef(decode_path):
5788             yield pp
5789
5790
5791 class PrimitiveTypes(Choice):
5792     """Predefined ``CHOICE`` for all generic primitive types
5793
5794     It could be useful for general decoding of some unspecified values:
5795
5796     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5797     OCTET STRING 3 bytes 666f6f
5798     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5799     INTEGER 1193046
5800     """
5801     __slots__ = ()
5802     schema = tuple((klass.__name__, klass()) for klass in (
5803         Boolean,
5804         Integer,
5805         BitString,
5806         OctetString,
5807         Null,
5808         ObjectIdentifier,
5809         UTF8String,
5810         NumericString,
5811         PrintableString,
5812         TeletexString,
5813         VideotexString,
5814         IA5String,
5815         UTCTime,
5816         GeneralizedTime,
5817         GraphicString,
5818         VisibleString,
5819         ISO646String,
5820         GeneralString,
5821         UniversalString,
5822         BMPString,
5823     ))
5824
5825
5826 AnyState = namedtuple(
5827     "AnyState",
5828     BasicState._fields + ("value", "defined"),
5829     **NAMEDTUPLE_KWARGS
5830 )
5831
5832
5833 class Any(Obj):
5834     """``ANY`` special type
5835
5836     >>> Any(Integer(-123))
5837     ANY INTEGER -123 (0X:7B)
5838     >>> a = Any(OctetString(b"hello world").encode())
5839     ANY 040b68656c6c6f20776f726c64
5840     >>> hexenc(bytes(a))
5841     b'0x040x0bhello world'
5842     """
5843     __slots__ = ("defined",)
5844     tag_default = tag_encode(0)
5845     asn1_type_name = "ANY"
5846
5847     def __init__(
5848             self,
5849             value=None,
5850             expl=None,
5851             optional=False,
5852             _decoded=(0, 0, 0),
5853     ):
5854         """
5855         :param value: set the value. Either any kind of pyderasn's
5856                       **ready** object, or bytes. Pay attention that
5857                       **no** validation is performed if raw binary value
5858                       is valid TLV, except just tag decoding
5859         :param bytes expl: override default tag with ``EXPLICIT`` one
5860         :param bool optional: is object ``OPTIONAL`` in sequence
5861         """
5862         super(Any, self).__init__(None, expl, None, optional, _decoded)
5863         if value is None:
5864             self._value = None
5865         else:
5866             value = self._value_sanitize(value)
5867             self._value = value
5868             if self._expl is None:
5869                 if value.__class__ == binary_type:
5870                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5871                 else:
5872                     tag_class, tag_num = value.tag_order
5873             else:
5874                 tag_class, _, tag_num = tag_decode(self._expl)
5875             self._tag_order = (tag_class, tag_num)
5876         self.defined = None
5877
5878     def _value_sanitize(self, value):
5879         if value.__class__ == binary_type:
5880             if len(value) == 0:
5881                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5882             return value
5883         if isinstance(value, self.__class__):
5884             return value._value
5885         if not isinstance(value, Obj):
5886             raise InvalidValueType((self.__class__, Obj, binary_type))
5887         return value
5888
5889     @property
5890     def ready(self):
5891         return self._value is not None
5892
5893     @property
5894     def tag_order(self):
5895         self._assert_ready()
5896         return self._tag_order
5897
5898     @property
5899     def bered(self):
5900         if self.expl_lenindef or self.lenindef:
5901             return True
5902         if self.defined is None:
5903             return False
5904         return self.defined[1].bered
5905
5906     def __getstate__(self):
5907         return AnyState(
5908             __version__,
5909             self.tag,
5910             self._tag_order,
5911             self._expl,
5912             None,
5913             self.optional,
5914             self.offset,
5915             self.llen,
5916             self.vlen,
5917             self.expl_lenindef,
5918             self.lenindef,
5919             self.ber_encoded,
5920             self._value,
5921             self.defined,
5922         )
5923
5924     def __setstate__(self, state):
5925         super(Any, self).__setstate__(state)
5926         self._value = state.value
5927         self.defined = state.defined
5928
5929     def __eq__(self, their):
5930         if their.__class__ == binary_type:
5931             if self._value.__class__ == binary_type:
5932                 return self._value == their
5933             return self._value.encode() == their
5934         if issubclass(their.__class__, Any):
5935             if self.ready and their.ready:
5936                 return bytes(self) == bytes(their)
5937             return self.ready == their.ready
5938         return False
5939
5940     def __call__(
5941             self,
5942             value=None,
5943             expl=None,
5944             optional=None,
5945     ):
5946         return self.__class__(
5947             value=value,
5948             expl=self._expl if expl is None else expl,
5949             optional=self.optional if optional is None else optional,
5950         )
5951
5952     def __bytes__(self):
5953         self._assert_ready()
5954         value = self._value
5955         if value.__class__ == binary_type:
5956             return value
5957         return self._value.encode()
5958
5959     @property
5960     def tlen(self):
5961         return 0
5962
5963     def _encode(self):
5964         self._assert_ready()
5965         value = self._value
5966         if value.__class__ == binary_type:
5967             return value
5968         return value.encode()
5969
5970     def _encode1st(self, state):
5971         self._assert_ready()
5972         value = self._value
5973         if value.__class__ == binary_type:
5974             return len(value), state
5975         return value.encode1st(state)
5976
5977     def _encode2nd(self, writer, state_iter):
5978         value = self._value
5979         if value.__class__ == binary_type:
5980             write_full(writer, value)
5981         else:
5982             value.encode2nd(writer, state_iter)
5983
5984     def _encode_cer(self, writer):
5985         self._assert_ready()
5986         value = self._value
5987         if value.__class__ == binary_type:
5988             write_full(writer, value)
5989         else:
5990             value.encode_cer(writer)
5991
5992     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5993         try:
5994             t, tlen, lv = tag_strip(tlv)
5995         except DecodeError as err:
5996             raise err.__class__(
5997                 msg=err.msg,
5998                 klass=self.__class__,
5999                 decode_path=decode_path,
6000                 offset=offset,
6001             )
6002         try:
6003             l, llen, v = len_decode(lv)
6004         except LenIndefForm as err:
6005             if not ctx.get("bered", False):
6006                 raise err.__class__(
6007                     msg=err.msg,
6008                     klass=self.__class__,
6009                     decode_path=decode_path,
6010                     offset=offset,
6011                 )
6012             llen, vlen, v = 1, 0, lv[1:]
6013             sub_offset = offset + tlen + llen
6014             chunk_i = 0
6015             while v[:EOC_LEN].tobytes() != EOC:
6016                 chunk, v = Any().decode(
6017                     v,
6018                     offset=sub_offset,
6019                     decode_path=decode_path + (str(chunk_i),),
6020                     leavemm=True,
6021                     ctx=ctx,
6022                     _ctx_immutable=False,
6023                 )
6024                 vlen += chunk.tlvlen
6025                 sub_offset += chunk.tlvlen
6026                 chunk_i += 1
6027             tlvlen = tlen + llen + vlen + EOC_LEN
6028             obj = self.__class__(
6029                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6030                 expl=self._expl,
6031                 optional=self.optional,
6032                 _decoded=(offset, 0, tlvlen),
6033             )
6034             obj.lenindef = True
6035             obj.tag = t.tobytes()
6036             yield decode_path, obj, v[EOC_LEN:]
6037             return
6038         except DecodeError as err:
6039             raise err.__class__(
6040                 msg=err.msg,
6041                 klass=self.__class__,
6042                 decode_path=decode_path,
6043                 offset=offset,
6044             )
6045         if l > len(v):
6046             raise NotEnoughData(
6047                 "encoded length is longer than data",
6048                 klass=self.__class__,
6049                 decode_path=decode_path,
6050                 offset=offset,
6051             )
6052         tlvlen = tlen + llen + l
6053         v, tail = tlv[:tlvlen], v[l:]
6054         obj = self.__class__(
6055             value=None if evgen_mode else v.tobytes(),
6056             expl=self._expl,
6057             optional=self.optional,
6058             _decoded=(offset, 0, tlvlen),
6059         )
6060         obj.tag = t.tobytes()
6061         yield decode_path, obj, tail
6062
6063     def __repr__(self):
6064         return pp_console_row(next(self.pps()))
6065
6066     def pps(self, decode_path=()):
6067         value = self._value
6068         if value is None:
6069             pass
6070         elif value.__class__ == binary_type:
6071             value = None
6072         else:
6073             value = repr(value)
6074         yield _pp(
6075             obj=self,
6076             asn1_type_name=self.asn1_type_name,
6077             obj_name=self.__class__.__name__,
6078             decode_path=decode_path,
6079             value=value,
6080             blob=self._value if self._value.__class__ == binary_type else None,
6081             optional=self.optional,
6082             default=self == self.default,
6083             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6084             expl=None if self._expl is None else tag_decode(self._expl),
6085             offset=self.offset,
6086             tlen=self.tlen,
6087             llen=self.llen,
6088             vlen=self.vlen,
6089             expl_offset=self.expl_offset if self.expled else None,
6090             expl_tlen=self.expl_tlen if self.expled else None,
6091             expl_llen=self.expl_llen if self.expled else None,
6092             expl_vlen=self.expl_vlen if self.expled else None,
6093             expl_lenindef=self.expl_lenindef,
6094             lenindef=self.lenindef,
6095             bered=self.bered,
6096         )
6097         defined_by, defined = self.defined or (None, None)
6098         if defined_by is not None:
6099             yield defined.pps(
6100                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6101             )
6102         for pp in self.pps_lenindef(decode_path):
6103             yield pp
6104
6105
6106 ########################################################################
6107 # ASN.1 constructed types
6108 ########################################################################
6109
6110 def abs_decode_path(decode_path, rel_path):
6111     """Create an absolute decode path from current and relative ones
6112
6113     :param decode_path: current decode path, starting point. Tuple of strings
6114     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6115                      If first tuple's element is "/", then treat it as
6116                      an absolute path, ignoring ``decode_path`` as
6117                      starting point. Also this tuple can contain ".."
6118                      elements, stripping the leading element from
6119                      ``decode_path``
6120
6121     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6122     ("foo", "bar", "baz", "whatever")
6123     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6124     ("foo", "whatever")
6125     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6126     ("baz", "whatever")
6127     """
6128     if rel_path[0] == "/":
6129         return rel_path[1:]
6130     if rel_path[0] == "..":
6131         return abs_decode_path(decode_path[:-1], rel_path[1:])
6132     return decode_path + rel_path
6133
6134
6135 SequenceState = namedtuple(
6136     "SequenceState",
6137     BasicState._fields + ("specs", "value",),
6138     **NAMEDTUPLE_KWARGS
6139 )
6140
6141
6142 class SequenceEncode1stMixing(object):
6143     def _encode1st(self, state):
6144         state.append(0)
6145         idx = len(state) - 1
6146         vlen = 0
6147         for v in self._values_for_encoding():
6148             l, _ = v.encode1st(state)
6149             vlen += l
6150         state[idx] = vlen
6151         return len(self.tag) + len_size(vlen) + vlen, state
6152
6153
6154 class Sequence(SequenceEncode1stMixing, Obj):
6155     """``SEQUENCE`` structure type
6156
6157     You have to make specification of sequence::
6158
6159         class Extension(Sequence):
6160             schema = (
6161                 ("extnID", ObjectIdentifier()),
6162                 ("critical", Boolean(default=False)),
6163                 ("extnValue", OctetString()),
6164             )
6165
6166     Then, you can work with it as with dictionary.
6167
6168     >>> ext = Extension()
6169     >>> Extension().specs
6170     OrderedDict([
6171         ('extnID', OBJECT IDENTIFIER),
6172         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6173         ('extnValue', OCTET STRING),
6174     ])
6175     >>> ext["extnID"] = "1.2.3"
6176     Traceback (most recent call last):
6177     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6178     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6179
6180     You can determine if sequence is ready to be encoded:
6181
6182     >>> ext.ready
6183     False
6184     >>> ext.encode()
6185     Traceback (most recent call last):
6186     pyderasn.ObjNotReady: object is not ready: extnValue
6187     >>> ext["extnValue"] = OctetString(b"foobar")
6188     >>> ext.ready
6189     True
6190
6191     Value you want to assign, must have the same **type** as in
6192     corresponding specification, but it can have different tags,
6193     optional/default attributes -- they will be taken from specification
6194     automatically::
6195
6196         class TBSCertificate(Sequence):
6197             schema = (
6198                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6199             [...]
6200
6201     >>> tbs = TBSCertificate()
6202     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6203
6204     Assign ``None`` to remove value from sequence.
6205
6206     You can set values in Sequence during its initialization:
6207
6208     >>> AlgorithmIdentifier((
6209         ("algorithm", ObjectIdentifier("1.2.3")),
6210         ("parameters", Any(Null()))
6211     ))
6212     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6213
6214     You can determine if value exists/set in the sequence and take its value:
6215
6216     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6217     (True, True, False)
6218     >>> ext["extnID"]
6219     OBJECT IDENTIFIER 1.2.3
6220
6221     But pay attention that if value has default, then it won't be (not
6222     in) in the sequence (because ``DEFAULT`` must not be encoded in
6223     DER), but you can read its value:
6224
6225     >>> "critical" in ext, ext["critical"]
6226     (False, BOOLEAN False)
6227     >>> ext["critical"] = Boolean(True)
6228     >>> "critical" in ext, ext["critical"]
6229     (True, BOOLEAN True)
6230
6231     All defaulted values are always optional.
6232
6233     .. _allow_default_values_ctx:
6234
6235     DER prohibits default value encoding and will raise an error if
6236     default value is unexpectedly met during decode.
6237     If :ref:`bered <bered_ctx>` context option is set, then no error
6238     will be raised, but ``bered`` attribute set. You can disable strict
6239     defaulted values existence validation by setting
6240     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6241
6242     All values with DEFAULT specified are decoded atomically in
6243     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6244     SEQUENCE, then it will be yielded as a single element, not
6245     disassembled. That is required for DEFAULT existence check.
6246
6247     Two sequences are equal if they have equal specification (schema),
6248     implicit/explicit tagging and the same values.
6249     """
6250     __slots__ = ("specs",)
6251     tag_default = tag_encode(form=TagFormConstructed, num=16)
6252     asn1_type_name = "SEQUENCE"
6253
6254     def __init__(
6255             self,
6256             value=None,
6257             schema=None,
6258             impl=None,
6259             expl=None,
6260             default=None,
6261             optional=False,
6262             _decoded=(0, 0, 0),
6263     ):
6264         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6265         if schema is None:
6266             schema = getattr(self, "schema", ())
6267         self.specs = (
6268             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6269         )
6270         self._value = {}
6271         if value is not None:
6272             if issubclass(value.__class__, Sequence):
6273                 self._value = value._value
6274             elif hasattr(value, "__iter__"):
6275                 for seq_key, seq_value in value:
6276                     self[seq_key] = seq_value
6277             else:
6278                 raise InvalidValueType((Sequence,))
6279         if default is not None:
6280             if not issubclass(default.__class__, Sequence):
6281                 raise InvalidValueType((Sequence,))
6282             default_value = default._value
6283             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6284             default_obj.specs = self.specs
6285             default_obj._value = default_value
6286             self.default = default_obj
6287             if value is None:
6288                 self._value = copy(default_obj._value)
6289
6290     @property
6291     def ready(self):
6292         for name, spec in iteritems(self.specs):
6293             value = self._value.get(name)
6294             if value is None:
6295                 if spec.optional:
6296                     continue
6297                 return False
6298             if not value.ready:
6299                 return False
6300         return True
6301
6302     @property
6303     def bered(self):
6304         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6305             return True
6306         return any(value.bered for value in itervalues(self._value))
6307
6308     def __getstate__(self):
6309         return SequenceState(
6310             __version__,
6311             self.tag,
6312             self._tag_order,
6313             self._expl,
6314             self.default,
6315             self.optional,
6316             self.offset,
6317             self.llen,
6318             self.vlen,
6319             self.expl_lenindef,
6320             self.lenindef,
6321             self.ber_encoded,
6322             self.specs,
6323             {k: copy(v) for k, v in iteritems(self._value)},
6324         )
6325
6326     def __setstate__(self, state):
6327         super(Sequence, self).__setstate__(state)
6328         self.specs = state.specs
6329         self._value = state.value
6330
6331     def __eq__(self, their):
6332         if not isinstance(their, self.__class__):
6333             return False
6334         return (
6335             self.specs == their.specs and
6336             self.tag == their.tag and
6337             self._expl == their._expl and
6338             self._value == their._value
6339         )
6340
6341     def __call__(
6342             self,
6343             value=None,
6344             impl=None,
6345             expl=None,
6346             default=None,
6347             optional=None,
6348     ):
6349         return self.__class__(
6350             value=value,
6351             schema=self.specs,
6352             impl=self.tag if impl is None else impl,
6353             expl=self._expl if expl is None else expl,
6354             default=self.default if default is None else default,
6355             optional=self.optional if optional is None else optional,
6356         )
6357
6358     def __contains__(self, key):
6359         return key in self._value
6360
6361     def __setitem__(self, key, value):
6362         spec = self.specs.get(key)
6363         if spec is None:
6364             raise ObjUnknown(key)
6365         if value is None:
6366             self._value.pop(key, None)
6367             return
6368         if not isinstance(value, spec.__class__):
6369             raise InvalidValueType((spec.__class__,))
6370         value = spec(value=value)
6371         if spec.default is not None and value == spec.default:
6372             self._value.pop(key, None)
6373             return
6374         self._value[key] = value
6375
6376     def __getitem__(self, key):
6377         value = self._value.get(key)
6378         if value is not None:
6379             return value
6380         spec = self.specs.get(key)
6381         if spec is None:
6382             raise ObjUnknown(key)
6383         if spec.default is not None:
6384             return spec.default
6385         return None
6386
6387     def _values_for_encoding(self):
6388         for name, spec in iteritems(self.specs):
6389             value = self._value.get(name)
6390             if value is None:
6391                 if spec.optional:
6392                     continue
6393                 raise ObjNotReady(name)
6394             yield value
6395
6396     def _encode(self):
6397         v = b"".join(v.encode() for v in self._values_for_encoding())
6398         return b"".join((self.tag, len_encode(len(v)), v))
6399
6400     def _encode2nd(self, writer, state_iter):
6401         write_full(writer, self.tag + len_encode(next(state_iter)))
6402         for v in self._values_for_encoding():
6403             v.encode2nd(writer, state_iter)
6404
6405     def _encode_cer(self, writer):
6406         write_full(writer, self.tag + LENINDEF)
6407         for v in self._values_for_encoding():
6408             v.encode_cer(writer)
6409         write_full(writer, EOC)
6410
6411     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6412         try:
6413             t, tlen, lv = tag_strip(tlv)
6414         except DecodeError as err:
6415             raise err.__class__(
6416                 msg=err.msg,
6417                 klass=self.__class__,
6418                 decode_path=decode_path,
6419                 offset=offset,
6420             )
6421         if t != self.tag:
6422             raise TagMismatch(
6423                 klass=self.__class__,
6424                 decode_path=decode_path,
6425                 offset=offset,
6426             )
6427         if tag_only:  # pragma: no cover
6428             yield None
6429             return
6430         lenindef = False
6431         ctx_bered = ctx.get("bered", False)
6432         try:
6433             l, llen, v = len_decode(lv)
6434         except LenIndefForm as err:
6435             if not ctx_bered:
6436                 raise err.__class__(
6437                     msg=err.msg,
6438                     klass=self.__class__,
6439                     decode_path=decode_path,
6440                     offset=offset,
6441                 )
6442             l, llen, v = 0, 1, lv[1:]
6443             lenindef = True
6444         except DecodeError as err:
6445             raise err.__class__(
6446                 msg=err.msg,
6447                 klass=self.__class__,
6448                 decode_path=decode_path,
6449                 offset=offset,
6450             )
6451         if l > len(v):
6452             raise NotEnoughData(
6453                 "encoded length is longer than data",
6454                 klass=self.__class__,
6455                 decode_path=decode_path,
6456                 offset=offset,
6457             )
6458         if not lenindef:
6459             v, tail = v[:l], v[l:]
6460         vlen = 0
6461         sub_offset = offset + tlen + llen
6462         values = {}
6463         ber_encoded = False
6464         ctx_allow_default_values = ctx.get("allow_default_values", False)
6465         for name, spec in iteritems(self.specs):
6466             if spec.optional and (
6467                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6468                     len(v) == 0
6469             ):
6470                 continue
6471             spec_defaulted = spec.default is not None
6472             sub_decode_path = decode_path + (name,)
6473             try:
6474                 if evgen_mode and not spec_defaulted:
6475                     for _decode_path, value, v_tail in spec.decode_evgen(
6476                             v,
6477                             sub_offset,
6478                             leavemm=True,
6479                             decode_path=sub_decode_path,
6480                             ctx=ctx,
6481                             _ctx_immutable=False,
6482                     ):
6483                         yield _decode_path, value, v_tail
6484                 else:
6485                     _, value, v_tail = next(spec.decode_evgen(
6486                         v,
6487                         sub_offset,
6488                         leavemm=True,
6489                         decode_path=sub_decode_path,
6490                         ctx=ctx,
6491                         _ctx_immutable=False,
6492                         _evgen_mode=False,
6493                     ))
6494             except TagMismatch as err:
6495                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6496                     continue
6497                 raise
6498
6499             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6500             if not evgen_mode and defined is not None:
6501                 defined_by, defined_spec = defined
6502                 if issubclass(value.__class__, SequenceOf):
6503                     for i, _value in enumerate(value):
6504                         sub_sub_decode_path = sub_decode_path + (
6505                             str(i),
6506                             DecodePathDefBy(defined_by),
6507                         )
6508                         defined_value, defined_tail = defined_spec.decode(
6509                             memoryview(bytes(_value)),
6510                             sub_offset + (
6511                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6512                                 if value.expled else (value.tlen + value.llen)
6513                             ),
6514                             leavemm=True,
6515                             decode_path=sub_sub_decode_path,
6516                             ctx=ctx,
6517                             _ctx_immutable=False,
6518                         )
6519                         if len(defined_tail) > 0:
6520                             raise DecodeError(
6521                                 "remaining data",
6522                                 klass=self.__class__,
6523                                 decode_path=sub_sub_decode_path,
6524                                 offset=offset,
6525                             )
6526                         _value.defined = (defined_by, defined_value)
6527                 else:
6528                     defined_value, defined_tail = defined_spec.decode(
6529                         memoryview(bytes(value)),
6530                         sub_offset + (
6531                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6532                             if value.expled else (value.tlen + value.llen)
6533                         ),
6534                         leavemm=True,
6535                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6536                         ctx=ctx,
6537                         _ctx_immutable=False,
6538                     )
6539                     if len(defined_tail) > 0:
6540                         raise DecodeError(
6541                             "remaining data",
6542                             klass=self.__class__,
6543                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6544                             offset=offset,
6545                         )
6546                     value.defined = (defined_by, defined_value)
6547
6548             value_len = value.fulllen
6549             vlen += value_len
6550             sub_offset += value_len
6551             v = v_tail
6552             if spec_defaulted:
6553                 if evgen_mode:
6554                     yield sub_decode_path, value, v_tail
6555                 if value == spec.default:
6556                     if ctx_bered or ctx_allow_default_values:
6557                         ber_encoded = True
6558                     else:
6559                         raise DecodeError(
6560                             "DEFAULT value met",
6561                             klass=self.__class__,
6562                             decode_path=sub_decode_path,
6563                             offset=sub_offset,
6564                         )
6565             if not evgen_mode:
6566                 values[name] = value
6567                 spec_defines = getattr(spec, "defines", ())
6568                 if len(spec_defines) == 0:
6569                     defines_by_path = ctx.get("defines_by_path", ())
6570                     if len(defines_by_path) > 0:
6571                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6572                 if spec_defines is not None and len(spec_defines) > 0:
6573                     for rel_path, schema in spec_defines:
6574                         defined = schema.get(value, None)
6575                         if defined is not None:
6576                             ctx.setdefault("_defines", []).append((
6577                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6578                                 (value, defined),
6579                             ))
6580         if lenindef:
6581             if v[:EOC_LEN].tobytes() != EOC:
6582                 raise DecodeError(
6583                     "no EOC",
6584                     klass=self.__class__,
6585                     decode_path=decode_path,
6586                     offset=offset,
6587                 )
6588             tail = v[EOC_LEN:]
6589             vlen += EOC_LEN
6590         elif len(v) > 0:
6591             raise DecodeError(
6592                 "remaining data",
6593                 klass=self.__class__,
6594                 decode_path=decode_path,
6595                 offset=offset,
6596             )
6597         obj = self.__class__(
6598             schema=self.specs,
6599             impl=self.tag,
6600             expl=self._expl,
6601             default=self.default,
6602             optional=self.optional,
6603             _decoded=(offset, llen, vlen),
6604         )
6605         obj._value = values
6606         obj.lenindef = lenindef
6607         obj.ber_encoded = ber_encoded
6608         yield decode_path, obj, tail
6609
6610     def __repr__(self):
6611         value = pp_console_row(next(self.pps()))
6612         cols = []
6613         for name in self.specs:
6614             _value = self._value.get(name)
6615             if _value is None:
6616                 continue
6617             cols.append("%s: %s" % (name, repr(_value)))
6618         return "%s[%s]" % (value, "; ".join(cols))
6619
6620     def pps(self, decode_path=()):
6621         yield _pp(
6622             obj=self,
6623             asn1_type_name=self.asn1_type_name,
6624             obj_name=self.__class__.__name__,
6625             decode_path=decode_path,
6626             optional=self.optional,
6627             default=self == self.default,
6628             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6629             expl=None if self._expl is None else tag_decode(self._expl),
6630             offset=self.offset,
6631             tlen=self.tlen,
6632             llen=self.llen,
6633             vlen=self.vlen,
6634             expl_offset=self.expl_offset if self.expled else None,
6635             expl_tlen=self.expl_tlen if self.expled else None,
6636             expl_llen=self.expl_llen if self.expled else None,
6637             expl_vlen=self.expl_vlen if self.expled else None,
6638             expl_lenindef=self.expl_lenindef,
6639             lenindef=self.lenindef,
6640             ber_encoded=self.ber_encoded,
6641             bered=self.bered,
6642         )
6643         for name in self.specs:
6644             value = self._value.get(name)
6645             if value is None:
6646                 continue
6647             yield value.pps(decode_path=decode_path + (name,))
6648         for pp in self.pps_lenindef(decode_path):
6649             yield pp
6650
6651
6652 class Set(Sequence, SequenceEncode1stMixing):
6653     """``SET`` structure type
6654
6655     Its usage is identical to :py:class:`pyderasn.Sequence`.
6656
6657     .. _allow_unordered_set_ctx:
6658
6659     DER prohibits unordered values encoding and will raise an error
6660     during decode. If :ref:`bered <bered_ctx>` context option is set,
6661     then no error will occur. Also you can disable strict values
6662     ordering check by setting ``"allow_unordered_set": True``
6663     :ref:`context <ctx>` option.
6664     """
6665     __slots__ = ()
6666     tag_default = tag_encode(form=TagFormConstructed, num=17)
6667     asn1_type_name = "SET"
6668
6669     def _values_for_encoding(self):
6670         return sorted(
6671             super(Set, self)._values_for_encoding(),
6672             key=attrgetter("tag_order"),
6673         )
6674
6675     def _encode_cer(self, writer):
6676         write_full(writer, self.tag + LENINDEF)
6677         for v in sorted(
6678                 super(Set, self)._values_for_encoding(),
6679                 key=attrgetter("tag_order_cer"),
6680         ):
6681             v.encode_cer(writer)
6682         write_full(writer, EOC)
6683
6684     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6685         try:
6686             t, tlen, lv = tag_strip(tlv)
6687         except DecodeError as err:
6688             raise err.__class__(
6689                 msg=err.msg,
6690                 klass=self.__class__,
6691                 decode_path=decode_path,
6692                 offset=offset,
6693             )
6694         if t != self.tag:
6695             raise TagMismatch(
6696                 klass=self.__class__,
6697                 decode_path=decode_path,
6698                 offset=offset,
6699             )
6700         if tag_only:
6701             yield None
6702             return
6703         lenindef = False
6704         ctx_bered = ctx.get("bered", False)
6705         try:
6706             l, llen, v = len_decode(lv)
6707         except LenIndefForm as err:
6708             if not ctx_bered:
6709                 raise err.__class__(
6710                     msg=err.msg,
6711                     klass=self.__class__,
6712                     decode_path=decode_path,
6713                     offset=offset,
6714                 )
6715             l, llen, v = 0, 1, lv[1:]
6716             lenindef = True
6717         except DecodeError as err:
6718             raise err.__class__(
6719                 msg=err.msg,
6720                 klass=self.__class__,
6721                 decode_path=decode_path,
6722                 offset=offset,
6723             )
6724         if l > len(v):
6725             raise NotEnoughData(
6726                 "encoded length is longer than data",
6727                 klass=self.__class__,
6728                 offset=offset,
6729             )
6730         if not lenindef:
6731             v, tail = v[:l], v[l:]
6732         vlen = 0
6733         sub_offset = offset + tlen + llen
6734         values = {}
6735         ber_encoded = False
6736         ctx_allow_default_values = ctx.get("allow_default_values", False)
6737         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6738         tag_order_prev = (0, 0)
6739         _specs_items = copy(self.specs)
6740
6741         while len(v) > 0:
6742             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6743                 break
6744             for name, spec in iteritems(_specs_items):
6745                 sub_decode_path = decode_path + (name,)
6746                 try:
6747                     spec.decode(
6748                         v,
6749                         sub_offset,
6750                         leavemm=True,
6751                         decode_path=sub_decode_path,
6752                         ctx=ctx,
6753                         tag_only=True,
6754                         _ctx_immutable=False,
6755                     )
6756                 except TagMismatch:
6757                     continue
6758                 break
6759             else:
6760                 raise TagMismatch(
6761                     klass=self.__class__,
6762                     decode_path=decode_path,
6763                     offset=offset,
6764                 )
6765             spec_defaulted = spec.default is not None
6766             if evgen_mode and not spec_defaulted:
6767                 for _decode_path, value, v_tail in spec.decode_evgen(
6768                         v,
6769                         sub_offset,
6770                         leavemm=True,
6771                         decode_path=sub_decode_path,
6772                         ctx=ctx,
6773                         _ctx_immutable=False,
6774                 ):
6775                     yield _decode_path, value, v_tail
6776             else:
6777                 _, value, v_tail = next(spec.decode_evgen(
6778                     v,
6779                     sub_offset,
6780                     leavemm=True,
6781                     decode_path=sub_decode_path,
6782                     ctx=ctx,
6783                     _ctx_immutable=False,
6784                     _evgen_mode=False,
6785                 ))
6786             value_tag_order = value.tag_order
6787             value_len = value.fulllen
6788             if tag_order_prev >= value_tag_order:
6789                 if ctx_bered or ctx_allow_unordered_set:
6790                     ber_encoded = True
6791                 else:
6792                     raise DecodeError(
6793                         "unordered " + self.asn1_type_name,
6794                         klass=self.__class__,
6795                         decode_path=sub_decode_path,
6796                         offset=sub_offset,
6797                     )
6798             if spec_defaulted:
6799                 if evgen_mode:
6800                     yield sub_decode_path, value, v_tail
6801                 if value != spec.default:
6802                     pass
6803                 elif ctx_bered or ctx_allow_default_values:
6804                     ber_encoded = True
6805                 else:
6806                     raise DecodeError(
6807                         "DEFAULT value met",
6808                         klass=self.__class__,
6809                         decode_path=sub_decode_path,
6810                         offset=sub_offset,
6811                     )
6812             values[name] = value
6813             del _specs_items[name]
6814             tag_order_prev = value_tag_order
6815             sub_offset += value_len
6816             vlen += value_len
6817             v = v_tail
6818
6819         obj = self.__class__(
6820             schema=self.specs,
6821             impl=self.tag,
6822             expl=self._expl,
6823             default=self.default,
6824             optional=self.optional,
6825             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6826         )
6827         if lenindef:
6828             if v[:EOC_LEN].tobytes() != EOC:
6829                 raise DecodeError(
6830                     "no EOC",
6831                     klass=self.__class__,
6832                     decode_path=decode_path,
6833                     offset=offset,
6834                 )
6835             tail = v[EOC_LEN:]
6836             obj.lenindef = True
6837         for name, spec in iteritems(self.specs):
6838             if name not in values and not spec.optional:
6839                 raise DecodeError(
6840                     "%s value is not ready" % name,
6841                     klass=self.__class__,
6842                     decode_path=decode_path,
6843                     offset=offset,
6844                 )
6845         if not evgen_mode:
6846             obj._value = values
6847         obj.ber_encoded = ber_encoded
6848         yield decode_path, obj, tail
6849
6850
6851 SequenceOfState = namedtuple(
6852     "SequenceOfState",
6853     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6854     **NAMEDTUPLE_KWARGS
6855 )
6856
6857
6858 class SequenceOf(SequenceEncode1stMixing, Obj):
6859     """``SEQUENCE OF`` sequence type
6860
6861     For that kind of type you must specify the object it will carry on
6862     (bounds are for example here, not required)::
6863
6864         class Ints(SequenceOf):
6865             schema = Integer()
6866             bounds = (0, 2)
6867
6868     >>> ints = Ints()
6869     >>> ints.append(Integer(123))
6870     >>> ints.append(Integer(234))
6871     >>> ints
6872     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6873     >>> [int(i) for i in ints]
6874     [123, 234]
6875     >>> ints.append(Integer(345))
6876     Traceback (most recent call last):
6877     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6878     >>> ints[1]
6879     INTEGER 234
6880     >>> ints[1] = Integer(345)
6881     >>> ints
6882     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6883
6884     You can initialize sequence with preinitialized values:
6885
6886     >>> ints = Ints([Integer(123), Integer(234)])
6887
6888     Also you can use iterator as a value:
6889
6890     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6891
6892     And it won't be iterated until encoding process. Pay attention that
6893     bounds and required schema checks are done only during the encoding
6894     process in that case! After encode was called, then value is zeroed
6895     back to empty list and you have to set it again. That mode is useful
6896     mainly with CER encoding mode, where all objects from the iterable
6897     will be streamed to the buffer, without copying all of them to
6898     memory first.
6899     """
6900     __slots__ = ("spec", "_bound_min", "_bound_max")
6901     tag_default = tag_encode(form=TagFormConstructed, num=16)
6902     asn1_type_name = "SEQUENCE OF"
6903
6904     def __init__(
6905             self,
6906             value=None,
6907             schema=None,
6908             bounds=None,
6909             impl=None,
6910             expl=None,
6911             default=None,
6912             optional=False,
6913             _decoded=(0, 0, 0),
6914     ):
6915         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6916         if schema is None:
6917             schema = getattr(self, "schema", None)
6918         if schema is None:
6919             raise ValueError("schema must be specified")
6920         self.spec = schema
6921         self._bound_min, self._bound_max = getattr(
6922             self,
6923             "bounds",
6924             (0, float("+inf")),
6925         ) if bounds is None else bounds
6926         self._value = []
6927         if value is not None:
6928             self._value = self._value_sanitize(value)
6929         if default is not None:
6930             default_value = self._value_sanitize(default)
6931             default_obj = self.__class__(
6932                 schema=schema,
6933                 impl=self.tag,
6934                 expl=self._expl,
6935             )
6936             default_obj._value = default_value
6937             self.default = default_obj
6938             if value is None:
6939                 self._value = copy(default_obj._value)
6940
6941     def _value_sanitize(self, value):
6942         iterator = False
6943         if issubclass(value.__class__, SequenceOf):
6944             value = value._value
6945         elif hasattr(value, NEXT_ATTR_NAME):
6946             iterator = True
6947         elif hasattr(value, "__iter__"):
6948             value = list(value)
6949         else:
6950             raise InvalidValueType((self.__class__, iter, "iterator"))
6951         if not iterator:
6952             if not self._bound_min <= len(value) <= self._bound_max:
6953                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6954             class_expected = self.spec.__class__
6955             for v in value:
6956                 if not isinstance(v, class_expected):
6957                     raise InvalidValueType((class_expected,))
6958         return value
6959
6960     @property
6961     def ready(self):
6962         if hasattr(self._value, NEXT_ATTR_NAME):
6963             return True
6964         if self._bound_min > 0 and len(self._value) == 0:
6965             return False
6966         return all(v.ready for v in self._value)
6967
6968     @property
6969     def bered(self):
6970         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6971             return True
6972         return any(v.bered for v in self._value)
6973
6974     def __getstate__(self):
6975         if hasattr(self._value, NEXT_ATTR_NAME):
6976             raise ValueError("can not pickle SequenceOf with iterator")
6977         return SequenceOfState(
6978             __version__,
6979             self.tag,
6980             self._tag_order,
6981             self._expl,
6982             self.default,
6983             self.optional,
6984             self.offset,
6985             self.llen,
6986             self.vlen,
6987             self.expl_lenindef,
6988             self.lenindef,
6989             self.ber_encoded,
6990             self.spec,
6991             [copy(v) for v in self._value],
6992             self._bound_min,
6993             self._bound_max,
6994         )
6995
6996     def __setstate__(self, state):
6997         super(SequenceOf, self).__setstate__(state)
6998         self.spec = state.spec
6999         self._value = state.value
7000         self._bound_min = state.bound_min
7001         self._bound_max = state.bound_max
7002
7003     def __eq__(self, their):
7004         if isinstance(their, self.__class__):
7005             return (
7006                 self.spec == their.spec and
7007                 self.tag == their.tag and
7008                 self._expl == their._expl and
7009                 self._value == their._value
7010             )
7011         if hasattr(their, "__iter__"):
7012             return self._value == list(their)
7013         return False
7014
7015     def __call__(
7016             self,
7017             value=None,
7018             bounds=None,
7019             impl=None,
7020             expl=None,
7021             default=None,
7022             optional=None,
7023     ):
7024         return self.__class__(
7025             value=value,
7026             schema=self.spec,
7027             bounds=(
7028                 (self._bound_min, self._bound_max)
7029                 if bounds is None else bounds
7030             ),
7031             impl=self.tag if impl is None else impl,
7032             expl=self._expl if expl is None else expl,
7033             default=self.default if default is None else default,
7034             optional=self.optional if optional is None else optional,
7035         )
7036
7037     def __contains__(self, key):
7038         return key in self._value
7039
7040     def append(self, value):
7041         if not isinstance(value, self.spec.__class__):
7042             raise InvalidValueType((self.spec.__class__,))
7043         if len(self._value) + 1 > self._bound_max:
7044             raise BoundsError(
7045                 self._bound_min,
7046                 len(self._value) + 1,
7047                 self._bound_max,
7048             )
7049         self._value.append(value)
7050
7051     def __iter__(self):
7052         return iter(self._value)
7053
7054     def __len__(self):
7055         return len(self._value)
7056
7057     def __setitem__(self, key, value):
7058         if not isinstance(value, self.spec.__class__):
7059             raise InvalidValueType((self.spec.__class__,))
7060         self._value[key] = self.spec(value=value)
7061
7062     def __getitem__(self, key):
7063         return self._value[key]
7064
7065     def _values_for_encoding(self):
7066         return iter(self._value)
7067
7068     def _encode(self):
7069         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7070         if iterator:
7071             values = []
7072             values_append = values.append
7073             class_expected = self.spec.__class__
7074             values_for_encoding = self._values_for_encoding()
7075             self._value = []
7076             for v in values_for_encoding:
7077                 if not isinstance(v, class_expected):
7078                     raise InvalidValueType((class_expected,))
7079                 values_append(v.encode())
7080             if not self._bound_min <= len(values) <= self._bound_max:
7081                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7082             value = b"".join(values)
7083         else:
7084             value = b"".join(v.encode() for v in self._values_for_encoding())
7085         return b"".join((self.tag, len_encode(len(value)), value))
7086
7087     def _encode1st(self, state):
7088         state = super(SequenceOf, self)._encode1st(state)
7089         if hasattr(self._value, NEXT_ATTR_NAME):
7090             self._value = []
7091         return state
7092
7093     def _encode2nd(self, writer, state_iter):
7094         write_full(writer, self.tag + len_encode(next(state_iter)))
7095         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7096         if iterator:
7097             values_count = 0
7098             class_expected = self.spec.__class__
7099             values_for_encoding = self._values_for_encoding()
7100             self._value = []
7101             for v in values_for_encoding:
7102                 if not isinstance(v, class_expected):
7103                     raise InvalidValueType((class_expected,))
7104                 v.encode2nd(writer, state_iter)
7105                 values_count += 1
7106             if not self._bound_min <= values_count <= self._bound_max:
7107                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7108         else:
7109             for v in self._values_for_encoding():
7110                 v.encode2nd(writer, state_iter)
7111
7112     def _encode_cer(self, writer):
7113         write_full(writer, self.tag + LENINDEF)
7114         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7115         if iterator:
7116             class_expected = self.spec.__class__
7117             values_count = 0
7118             values_for_encoding = self._values_for_encoding()
7119             self._value = []
7120             for v in values_for_encoding:
7121                 if not isinstance(v, class_expected):
7122                     raise InvalidValueType((class_expected,))
7123                 v.encode_cer(writer)
7124                 values_count += 1
7125             if not self._bound_min <= values_count <= self._bound_max:
7126                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7127         else:
7128             for v in self._values_for_encoding():
7129                 v.encode_cer(writer)
7130         write_full(writer, EOC)
7131
7132     def _decode(
7133             self,
7134             tlv,
7135             offset,
7136             decode_path,
7137             ctx,
7138             tag_only,
7139             evgen_mode,
7140             ordering_check=False,
7141     ):
7142         try:
7143             t, tlen, lv = tag_strip(tlv)
7144         except DecodeError as err:
7145             raise err.__class__(
7146                 msg=err.msg,
7147                 klass=self.__class__,
7148                 decode_path=decode_path,
7149                 offset=offset,
7150             )
7151         if t != self.tag:
7152             raise TagMismatch(
7153                 klass=self.__class__,
7154                 decode_path=decode_path,
7155                 offset=offset,
7156             )
7157         if tag_only:
7158             yield None
7159             return
7160         lenindef = False
7161         ctx_bered = ctx.get("bered", False)
7162         try:
7163             l, llen, v = len_decode(lv)
7164         except LenIndefForm as err:
7165             if not ctx_bered:
7166                 raise err.__class__(
7167                     msg=err.msg,
7168                     klass=self.__class__,
7169                     decode_path=decode_path,
7170                     offset=offset,
7171                 )
7172             l, llen, v = 0, 1, lv[1:]
7173             lenindef = True
7174         except DecodeError as err:
7175             raise err.__class__(
7176                 msg=err.msg,
7177                 klass=self.__class__,
7178                 decode_path=decode_path,
7179                 offset=offset,
7180             )
7181         if l > len(v):
7182             raise NotEnoughData(
7183                 "encoded length is longer than data",
7184                 klass=self.__class__,
7185                 decode_path=decode_path,
7186                 offset=offset,
7187             )
7188         if not lenindef:
7189             v, tail = v[:l], v[l:]
7190         vlen = 0
7191         sub_offset = offset + tlen + llen
7192         _value = []
7193         _value_count = 0
7194         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7195         value_prev = memoryview(v[:0])
7196         ber_encoded = False
7197         spec = self.spec
7198         while len(v) > 0:
7199             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7200                 break
7201             sub_decode_path = decode_path + (str(_value_count),)
7202             if evgen_mode:
7203                 for _decode_path, value, v_tail in spec.decode_evgen(
7204                         v,
7205                         sub_offset,
7206                         leavemm=True,
7207                         decode_path=sub_decode_path,
7208                         ctx=ctx,
7209                         _ctx_immutable=False,
7210                 ):
7211                     yield _decode_path, value, v_tail
7212             else:
7213                 _, value, v_tail = next(spec.decode_evgen(
7214                     v,
7215                     sub_offset,
7216                     leavemm=True,
7217                     decode_path=sub_decode_path,
7218                     ctx=ctx,
7219                     _ctx_immutable=False,
7220                     _evgen_mode=False,
7221                 ))
7222             value_len = value.fulllen
7223             if ordering_check:
7224                 if value_prev.tobytes() > v[:value_len].tobytes():
7225                     if ctx_bered or ctx_allow_unordered_set:
7226                         ber_encoded = True
7227                     else:
7228                         raise DecodeError(
7229                             "unordered " + self.asn1_type_name,
7230                             klass=self.__class__,
7231                             decode_path=sub_decode_path,
7232                             offset=sub_offset,
7233                         )
7234                 value_prev = v[:value_len]
7235             _value_count += 1
7236             if not evgen_mode:
7237                 _value.append(value)
7238             sub_offset += value_len
7239             vlen += value_len
7240             v = v_tail
7241         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7242             raise DecodeError(
7243                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7244                 klass=self.__class__,
7245                 decode_path=decode_path,
7246                 offset=offset,
7247             )
7248         try:
7249             obj = self.__class__(
7250                 value=None if evgen_mode else _value,
7251                 schema=spec,
7252                 bounds=(self._bound_min, self._bound_max),
7253                 impl=self.tag,
7254                 expl=self._expl,
7255                 default=self.default,
7256                 optional=self.optional,
7257                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7258             )
7259         except BoundsError as err:
7260             raise DecodeError(
7261                 msg=str(err),
7262                 klass=self.__class__,
7263                 decode_path=decode_path,
7264                 offset=offset,
7265             )
7266         if lenindef:
7267             if v[:EOC_LEN].tobytes() != EOC:
7268                 raise DecodeError(
7269                     "no EOC",
7270                     klass=self.__class__,
7271                     decode_path=decode_path,
7272                     offset=offset,
7273                 )
7274             obj.lenindef = True
7275             tail = v[EOC_LEN:]
7276         obj.ber_encoded = ber_encoded
7277         yield decode_path, obj, tail
7278
7279     def __repr__(self):
7280         return "%s[%s]" % (
7281             pp_console_row(next(self.pps())),
7282             ", ".join(repr(v) for v in self._value),
7283         )
7284
7285     def pps(self, decode_path=()):
7286         yield _pp(
7287             obj=self,
7288             asn1_type_name=self.asn1_type_name,
7289             obj_name=self.__class__.__name__,
7290             decode_path=decode_path,
7291             optional=self.optional,
7292             default=self == self.default,
7293             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7294             expl=None if self._expl is None else tag_decode(self._expl),
7295             offset=self.offset,
7296             tlen=self.tlen,
7297             llen=self.llen,
7298             vlen=self.vlen,
7299             expl_offset=self.expl_offset if self.expled else None,
7300             expl_tlen=self.expl_tlen if self.expled else None,
7301             expl_llen=self.expl_llen if self.expled else None,
7302             expl_vlen=self.expl_vlen if self.expled else None,
7303             expl_lenindef=self.expl_lenindef,
7304             lenindef=self.lenindef,
7305             ber_encoded=self.ber_encoded,
7306             bered=self.bered,
7307         )
7308         for i, value in enumerate(self._value):
7309             yield value.pps(decode_path=decode_path + (str(i),))
7310         for pp in self.pps_lenindef(decode_path):
7311             yield pp
7312
7313
7314 class SetOf(SequenceOf):
7315     """``SET OF`` sequence type
7316
7317     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7318     """
7319     __slots__ = ()
7320     tag_default = tag_encode(form=TagFormConstructed, num=17)
7321     asn1_type_name = "SET OF"
7322
7323     def _value_sanitize(self, value):
7324         value = super(SetOf, self)._value_sanitize(value)
7325         if hasattr(value, NEXT_ATTR_NAME):
7326             raise ValueError(
7327                 "SetOf does not support iterator values, as no sense in them"
7328             )
7329         return value
7330
7331     def _encode(self):
7332         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7333         return b"".join((self.tag, len_encode(len(v)), v))
7334
7335     def _encode2nd(self, writer, state_iter):
7336         write_full(writer, self.tag + len_encode(next(state_iter)))
7337         values = []
7338         for v in self._values_for_encoding():
7339             buf = BytesIO()
7340             v.encode2nd(buf.write, state_iter)
7341             values.append(buf.getvalue())
7342         values.sort()
7343         for v in values:
7344             write_full(writer, v)
7345
7346     def _encode_cer(self, writer):
7347         write_full(writer, self.tag + LENINDEF)
7348         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7349             write_full(writer, v)
7350         write_full(writer, EOC)
7351
7352     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7353         return super(SetOf, self)._decode(
7354             tlv,
7355             offset,
7356             decode_path,
7357             ctx,
7358             tag_only,
7359             evgen_mode,
7360             ordering_check=True,
7361         )
7362
7363
7364 def obj_by_path(pypath):  # pragma: no cover
7365     """Import object specified as string Python path
7366
7367     Modules must be separated from classes/functions with ``:``.
7368
7369     >>> obj_by_path("foo.bar:Baz")
7370     <class 'foo.bar.Baz'>
7371     >>> obj_by_path("foo.bar:Baz.boo")
7372     <classmethod 'foo.bar.Baz.boo'>
7373     """
7374     mod, objs = pypath.rsplit(":", 1)
7375     from importlib import import_module
7376     obj = import_module(mod)
7377     for obj_name in objs.split("."):
7378         obj = getattr(obj, obj_name)
7379     return obj
7380
7381
7382 def generic_decoder():  # pragma: no cover
7383     # All of this below is a big hack with self references
7384     choice = PrimitiveTypes()
7385     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7386     choice.specs["SetOf"] = SetOf(schema=choice)
7387     for i in six_xrange(31):
7388         choice.specs["SequenceOf%d" % i] = SequenceOf(
7389             schema=choice,
7390             expl=tag_ctxc(i),
7391         )
7392     choice.specs["Any"] = Any()
7393
7394     # Class name equals to type name, to omit it from output
7395     class SEQUENCEOF(SequenceOf):
7396         __slots__ = ()
7397         schema = choice
7398
7399     def pprint_any(
7400             obj,
7401             oid_maps=(),
7402             with_colours=False,
7403             with_decode_path=False,
7404             decode_path_only=(),
7405             decode_path=(),
7406     ):
7407         def _pprint_pps(pps):
7408             for pp in pps:
7409                 if hasattr(pp, "_fields"):
7410                     if (
7411                             decode_path_only != () and
7412                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7413                     ):
7414                         continue
7415                     if pp.asn1_type_name == Choice.asn1_type_name:
7416                         continue
7417                     pp_kwargs = pp._asdict()
7418                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7419                     pp = _pp(**pp_kwargs)
7420                     yield pp_console_row(
7421                         pp,
7422                         oid_maps=oid_maps,
7423                         with_offsets=True,
7424                         with_blob=False,
7425                         with_colours=with_colours,
7426                         with_decode_path=with_decode_path,
7427                         decode_path_len_decrease=len(decode_path_only),
7428                     )
7429                     for row in pp_console_blob(
7430                             pp,
7431                             decode_path_len_decrease=len(decode_path_only),
7432                     ):
7433                         yield row
7434                 else:
7435                     for row in _pprint_pps(pp):
7436                         yield row
7437         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7438     return SEQUENCEOF(), pprint_any
7439
7440
7441 def ascii_visualize(ba):
7442     """Output only ASCII printable characters, like in hexdump -C
7443
7444     Example output for given binary string (right part)::
7445
7446         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7447                                                            ^^^^^^^^^^^^^^^^
7448     """
7449     return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7450
7451
7452 def hexdump(raw):
7453     """Generate ``hexdump -C`` like output
7454
7455     Rendered example::
7456
7457         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7458         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7459
7460     Result of that function is a generator of lines, where each line is
7461     a list of columns::
7462
7463         [
7464             [...],
7465             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7466                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7467                           " |i..?.....G...k.j|"]
7468             [...],
7469         ]
7470     """
7471     hexed = hexenc(raw).upper()
7472     addr, cols = 0, ["%08x " % 0]
7473     for i in six_xrange(0, len(hexed), 2):
7474         if i != 0 and i // 2 % 8 == 0:
7475             cols[-1] += " "
7476         if i != 0 and i // 2 % 16 == 0:
7477             cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7478             yield cols
7479             addr += 16
7480             cols = ["%08x " % addr]
7481         cols.append(" " + hexed[i:i + 2])
7482     if len(cols) > 0:
7483         cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7484         yield cols
7485
7486
7487 def browse(raw, obj, oid_maps=()):
7488     """Interactive browser
7489
7490     :param bytes raw: binary data you decoded
7491     :param obj: decoded :py:class:`pyderasn.Obj`
7492     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7493                      Its human readable form is printed when OID is met
7494
7495     .. note:: `urwid <http://urwid.org/>`__ dependency required
7496
7497     This browser is an interactive terminal application for browsing
7498     structures of your decoded ASN.1 objects. You can quit it with **q**
7499     key. It consists of three windows:
7500
7501     :tree:
7502      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7503      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7504      **Left** key goes to constructed element above. **Plus**/**Minus**
7505      keys collapse/uncollapse constructed elements. **Space** toggles it
7506     :info:
7507      window with various information about element. You can scroll it
7508      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7509     :hexdump:
7510      window with raw data hexdump and highlighted current element's
7511      contents. It automatically focuses on element's data. You can
7512      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7513      speed) keys. If element has explicit tag, then it also will be
7514      highlighted with different colour
7515
7516     Window's header contains current decode path and progress bars with
7517     position in *info* and *hexdump* windows.
7518
7519     If you press **d**, then current element will be saved in the
7520     current directory under its decode path name (adding ".0", ".1", etc
7521     suffix if such file already exists). **D** will save it with explicit tag.
7522
7523     You can also invoke it with ``--browse`` command line argument.
7524     """
7525     from copy import deepcopy
7526     from os.path import exists as path_exists
7527     import urwid
7528
7529     class TW(urwid.TreeWidget):
7530         def __init__(self, state, *args, **kwargs):
7531             self.state = state
7532             self.scrolled = {"info": False, "hexdump": False}
7533             super(TW, self).__init__(*args, **kwargs)
7534
7535         def _get_pp(self):
7536             pp = self.get_node().get_value()
7537             constructed = len(pp) > 1
7538             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7539
7540         def _state_update(self):
7541             pp, _ = self._get_pp()
7542             self.state["decode_path"].set_text(
7543                 ":".join(str(p) for p in pp.decode_path)
7544             )
7545             lines = deepcopy(self.state["hexed"])
7546
7547             def attr_set(i, attr):
7548                 line = lines[i // 16]
7549                 idx = 1 + (i - 16 * (i // 16))
7550                 line[idx] = (attr, line[idx])
7551
7552             if pp.expl_offset is not None:
7553                 for i in six_xrange(
7554                         pp.expl_offset,
7555                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7556                 ):
7557                     attr_set(i, "select-expl")
7558             for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7559                 attr_set(i, "select-value")
7560             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7561             self.state["hexdump"].set_focus(pp.offset // 16)
7562             self.state["hexdump"].set_focus_valign("middle")
7563             self.state["hexdump_bar"].set_completion(
7564                 (100 * pp.offset // 16) //
7565                 len(self.state["hexdump"]._body.positions())
7566             )
7567
7568             lines = [
7569                 [("header", "Name: "), pp.obj_name],
7570                 [("header", "Type: "), pp.asn1_type_name],
7571                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7572                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7573                     pp.tlen, pp.llen, pp.vlen,
7574                 )],
7575                 [("header", "TLVlen: "), "%d" % sum((
7576                     pp.tlen, pp.llen, pp.vlen,
7577                 ))],
7578                 [("header", "Slice: "), "[%d:%d]" % (
7579                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7580                 )],
7581             ]
7582             if pp.lenindef:
7583                 lines.append([("warning", "LENINDEF")])
7584             if pp.ber_encoded:
7585                 lines.append([("warning", "BER encoded")])
7586             if pp.bered:
7587                 lines.append([("warning", "BERed")])
7588             if pp.expl is not None:
7589                 lines.append([("header", "EXPLICIT")])
7590                 klass, _, num = pp.expl
7591                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7592                 if pp.expl_offset is not None:
7593                     lines.append(["  Offset: %d" % pp.expl_offset])
7594                     lines.append(["  [TLV]len: %d/%d/%d" % (
7595                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7596                     )])
7597                     lines.append(["  TLVlen: %d" % sum((
7598                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7599                     ))])
7600                     lines.append(["  Slice: [%d:%d]" % (
7601                         pp.expl_offset,
7602                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7603                     )])
7604             if pp.impl is not None:
7605                 klass, _, num = pp.impl
7606                 lines.append([
7607                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7608                 ])
7609             if pp.optional:
7610                 lines.append(["OPTIONAL"])
7611             if pp.default:
7612                 lines.append(["DEFAULT"])
7613             if len(pp.decode_path) > 0:
7614                 ent = pp.decode_path[-1]
7615                 if isinstance(ent, DecodePathDefBy):
7616                     lines.append([""])
7617                     value = str(ent.defined_by)
7618                     oid_name = find_oid_name(
7619                         ent.defined_by.asn1_type_name, oid_maps, value,
7620                     )
7621                     lines.append([("header", "DEFINED BY: "), "%s" % (
7622                         value if oid_name is None
7623                         else "%s (%s)" % (oid_name, value)
7624                     )])
7625             lines.append([""])
7626             if pp.value is not None:
7627                 lines.append([("header", "Value: "), pp.value])
7628                 if (
7629                         len(oid_maps) > 0 and
7630                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7631                 ):
7632                     for oid_map in oid_maps:
7633                         oid_name = oid_map.get(pp.value)
7634                         if oid_name is not None:
7635                             lines.append([("header", "Human: "), oid_name])
7636                             break
7637                 if pp.asn1_type_name == Integer.asn1_type_name:
7638                     lines.append([
7639                         ("header", "Decimal: "), "%d" % int(pp.obj),
7640                     ])
7641                     lines.append([
7642                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7643                     ])
7644             if pp.blob.__class__ == binary_type:
7645                 blob = hexenc(pp.blob).upper()
7646                 for i in six_xrange(0, len(blob), 32):
7647                     lines.append([colonize_hex(blob[i:i + 32])])
7648             elif pp.blob.__class__ == tuple:
7649                 lines.append([", ".join(pp.blob)])
7650             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7651             self.state["info_bar"].set_completion(0)
7652
7653         def selectable(self):
7654             if self.state["widget_current"] != self:
7655                 self.state["widget_current"] = self
7656                 self.scrolled["info"] = False
7657                 self.scrolled["hexdump"] = False
7658                 self._state_update()
7659             return super(TW, self).selectable()
7660
7661         def get_display_text(self):
7662             pp, constructed = self._get_pp()
7663             style = "constructed" if constructed else ""
7664             if len(pp.decode_path) == 0:
7665                 return (style, pp.obj_name)
7666             if pp.asn1_type_name == "EOC":
7667                 return ("eoc", "EOC")
7668             ent = pp.decode_path[-1]
7669             if isinstance(ent, DecodePathDefBy):
7670                 value = str(ent.defined_by)
7671                 oid_name = find_oid_name(
7672                     ent.defined_by.asn1_type_name, oid_maps, value,
7673                 )
7674                 return ("defby", "DEFBY:" + (
7675                     value if oid_name is None else oid_name
7676                 ))
7677             return (style, ent)
7678
7679         def _scroll(self, what, step):
7680             self.state[what]._invalidate()
7681             pos = self.state[what].focus_position
7682             if not self.scrolled[what]:
7683                 self.scrolled[what] = True
7684                 pos -= 2
7685             pos = max(0, pos + step)
7686             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7687             self.state[what].set_focus(pos)
7688             self.state[what].set_focus_valign("top")
7689             self.state[what + "_bar"].set_completion(
7690                 (100 * pos) // len(self.state[what]._body.positions())
7691             )
7692
7693         def keypress(self, size, key):
7694             if key == "q":
7695                 raise urwid.ExitMainLoop()
7696
7697             if key == " ":
7698                 self.expanded = not self.expanded
7699                 self.update_expanded_icon()
7700                 return None
7701
7702             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7703             if key in hexdump_steps:
7704                 self._scroll("hexdump", hexdump_steps[key])
7705                 return None
7706
7707             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7708             if key in info_steps:
7709                 self._scroll("info", info_steps[key])
7710                 return None
7711
7712             if key in ("d", "D"):
7713                 pp, _ = self._get_pp()
7714                 dp = ":".join(str(p) for p in pp.decode_path)
7715                 dp = dp.replace(" ", "_")
7716                 if dp == "":
7717                     dp = "root"
7718                 if key == "d" or pp.expl_offset is None:
7719                     data = self.state["raw"][pp.offset:(
7720                         pp.offset + pp.tlen + pp.llen + pp.vlen
7721                     )]
7722                 else:
7723                     data = self.state["raw"][pp.expl_offset:(
7724                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7725                     )]
7726                 ctr = 0
7727
7728                 def duplicate_path(dp, ctr):
7729                     if ctr == 0:
7730                         return dp
7731                     return "%s.%d" % (dp, ctr)
7732
7733                 while True:
7734                     if not path_exists(duplicate_path(dp, ctr)):
7735                         break
7736                     ctr += 1
7737                 dp = duplicate_path(dp, ctr)
7738                 with open(dp, "wb") as fd:
7739                     fd.write(data)
7740                 self.state["decode_path"].set_text(
7741                     ("warning", "Saved to: " + dp)
7742                 )
7743                 return None
7744             return super(TW, self).keypress(size, key)
7745
7746     class PN(urwid.ParentNode):
7747         def __init__(self, state, value, *args, **kwargs):
7748             self.state = state
7749             if not hasattr(value, "_fields"):
7750                 value = list(value)
7751             super(PN, self).__init__(value, *args, **kwargs)
7752
7753         def load_widget(self):
7754             return TW(self.state, self)
7755
7756         def load_child_keys(self):
7757             value = self.get_value()
7758             if hasattr(value, "_fields"):
7759                 return []
7760             return range(len(value[1:]))
7761
7762         def load_child_node(self, key):
7763             return PN(
7764                 self.state,
7765                 self.get_value()[key + 1],
7766                 parent=self,
7767                 key=key,
7768                 depth=self.get_depth() + 1,
7769             )
7770
7771     class LabeledPG(urwid.ProgressBar):
7772         def __init__(self, label, *args, **kwargs):
7773             self.label = label
7774             super(LabeledPG, self).__init__(*args, **kwargs)
7775
7776         def get_text(self):
7777             return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7778
7779     WinHexdump = urwid.ListBox([urwid.Text("")])
7780     WinInfo = urwid.ListBox([urwid.Text("")])
7781     WinDecodePath = urwid.Text("", "center")
7782     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7783     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7784     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7785         {
7786             "raw": raw,
7787             "hexed": list(hexdump(raw)),
7788             "widget_current": None,
7789             "info": WinInfo,
7790             "info_bar": WinInfoBar,
7791             "hexdump": WinHexdump,
7792             "hexdump_bar": WinHexdumpBar,
7793             "decode_path": WinDecodePath,
7794         },
7795         list(obj.pps()),
7796     )))
7797     help_text = " ".join((
7798         "q:quit",
7799         "space:(un)collapse",
7800         "(pg)up/down/home/end:nav",
7801         "jkJK:hexdump hlHL:info",
7802         "dD:dump",
7803     ))
7804     urwid.MainLoop(
7805         urwid.Frame(
7806             urwid.Columns([
7807                 ("weight", 1, WinTree),
7808                 ("weight", 2, urwid.Pile([
7809                     urwid.LineBox(WinInfo),
7810                     urwid.LineBox(WinHexdump),
7811                 ])),
7812             ]),
7813             header=urwid.Columns([
7814                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7815                 ("weight", 1, WinInfoBar),
7816                 ("weight", 1, WinHexdumpBar),
7817             ]),
7818             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7819         ),
7820         [
7821             ("header", "bold", ""),
7822             ("constructed", "bold", ""),
7823             ("help", "light magenta", ""),
7824             ("warning", "light red", ""),
7825             ("defby", "light red", ""),
7826             ("eoc", "dark red", ""),
7827             ("select-value", "light green", ""),
7828             ("select-expl", "light red", ""),
7829             ("pg-normal", "", "light blue"),
7830             ("pg-complete", "black", "yellow"),
7831         ],
7832     ).run()
7833
7834
7835 def main():  # pragma: no cover
7836     import argparse
7837     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7838     parser.add_argument(
7839         "--skip",
7840         type=int,
7841         default=0,
7842         help="Skip that number of bytes from the beginning",
7843     )
7844     parser.add_argument(
7845         "--oids",
7846         help="Python paths to dictionary with OIDs, comma separated",
7847     )
7848     parser.add_argument(
7849         "--schema",
7850         help="Python path to schema definition to use",
7851     )
7852     parser.add_argument(
7853         "--defines-by-path",
7854         help="Python path to decoder's defines_by_path",
7855     )
7856     parser.add_argument(
7857         "--nobered",
7858         action="store_true",
7859         help="Disallow BER encoding",
7860     )
7861     parser.add_argument(
7862         "--print-decode-path",
7863         action="store_true",
7864         help="Print decode paths",
7865     )
7866     parser.add_argument(
7867         "--decode-path-only",
7868         help="Print only specified decode path",
7869     )
7870     parser.add_argument(
7871         "--allow-expl-oob",
7872         action="store_true",
7873         help="Allow explicit tag out-of-bound",
7874     )
7875     parser.add_argument(
7876         "--evgen",
7877         action="store_true",
7878         help="Turn on event generation mode",
7879     )
7880     parser.add_argument(
7881         "--browse",
7882         action="store_true",
7883         help="Start ASN.1 browser",
7884     )
7885     parser.add_argument(
7886         "RAWFile",
7887         type=argparse.FileType("rb"),
7888         help="Path to BER/CER/DER file you want to decode",
7889     )
7890     args = parser.parse_args()
7891     if PY2:
7892         args.RAWFile.seek(args.skip)
7893         raw = memoryview(args.RAWFile.read())
7894         args.RAWFile.close()
7895     else:
7896         raw = file_mmaped(args.RAWFile)[args.skip:]
7897     oid_maps = (
7898         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7899         if args.oids else ()
7900     )
7901     from functools import partial
7902     if args.schema:
7903         schema = obj_by_path(args.schema)
7904         pprinter = partial(pprint, big_blobs=True)
7905     else:
7906         schema, pprinter = generic_decoder()
7907     ctx = {
7908         "bered": not args.nobered,
7909         "allow_expl_oob": args.allow_expl_oob,
7910     }
7911     if args.defines_by_path is not None:
7912         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7913     if args.browse:
7914         obj, _ = schema().decode(raw, ctx=ctx)
7915         browse(raw, obj, oid_maps)
7916         from sys import exit as sys_exit
7917         sys_exit(0)
7918     from os import environ
7919     pprinter = partial(
7920         pprinter,
7921         oid_maps=oid_maps,
7922         with_colours=environ.get("NO_COLOR") is None,
7923         with_decode_path=args.print_decode_path,
7924         decode_path_only=(
7925             () if args.decode_path_only is None else
7926             tuple(args.decode_path_only.split(":"))
7927         ),
7928     )
7929     if args.evgen:
7930         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7931             print(pprinter(obj, decode_path=decode_path))
7932     else:
7933         obj, tail = schema().decode(raw, ctx=ctx)
7934         print(pprinter(obj))
7935     if tail != b"":
7936         print("\nTrailing data: %s" % hexenc(tail))
7937
7938
7939 if __name__ == "__main__":
7940     from pyderasn import *
7941     main()