]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Prepare for release
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/CER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER/CER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 Also it could be helpful to add quick ASN.1 pprinting command in your
359 pdb's configuration file::
360
361     alias pp1 import pyderasn ;; print(pyderasn.pprint(%1, oid_maps=(locals().get("OID_STR_TO_NAME", {}),)))
362
363 .. _definedby:
364
365 DEFINED BY
366 ----------
367
368 ASN.1 structures often have ANY and OCTET STRING fields, that are
369 DEFINED BY some previously met ObjectIdentifier. This library provides
370 ability to specify mapping between some OID and field that must be
371 decoded with specific specification.
372
373 .. _defines:
374
375 defines kwarg
376 _____________
377
378 :py:class:`pyderasn.ObjectIdentifier` field inside
379 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
380 necessary for decoding structures. For example, CMS (:rfc:`5652`)
381 container::
382
383     class ContentInfo(Sequence):
384         schema = (
385             ("contentType", ContentType(defines=((("content",), {
386                 id_digestedData: DigestedData(),
387                 id_signedData: SignedData(),
388             }),))),
389             ("content", Any(expl=tag_ctxc(0))),
390         )
391
392 ``contentType`` field tells that it defines that ``content`` must be
393 decoded with ``SignedData`` specification, if ``contentType`` equals to
394 ``id-signedData``. The same applies to ``DigestedData``. If
395 ``contentType`` contains unknown OID, then no automatic decoding is
396 done.
397
398 You can specify multiple fields, that will be autodecoded -- that is why
399 ``defines`` kwarg is a sequence. You can specify defined field
400 relatively or absolutely to current decode path. For example ``defines``
401 for AlgorithmIdentifier of X.509's
402 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
403
404         (
405             (("parameters",), {
406                 id_ecPublicKey: ECParameters(),
407                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
408             }),
409             (("..", "subjectPublicKey"), {
410                 id_rsaEncryption: RSAPublicKey(),
411                 id_GostR3410_2001: OctetString(),
412             }),
413         ),
414
415 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
416 autodecode its parameters inside SPKI's algorithm and its public key
417 itself.
418
419 Following types can be automatically decoded (DEFINED BY):
420
421 * :py:class:`pyderasn.Any`
422 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
423 * :py:class:`pyderasn.OctetString`
424 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
425   ``Any``/``BitString``/``OctetString``-s
426
427 When any of those fields is automatically decoded, then ``.defined``
428 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
429 was defined, ``value`` contains corresponding decoded value. For example
430 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
431
432 .. _defines_by_path_ctx:
433
434 defines_by_path context option
435 ______________________________
436
437 Sometimes you either can not or do not want to explicitly set *defines*
438 in the schema. You can dynamically apply those definitions when calling
439 :py:meth:`pyderasn.Obj.decode` method.
440
441 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
442 value must be sequence of following tuples::
443
444     (decode_path, defines)
445
446 where ``decode_path`` is a tuple holding so-called decode path to the
447 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
448 ``defines``, holding exactly the same value as accepted in its
449 :ref:`keyword argument <defines>`.
450
451 For example, again for CMS, you want to automatically decode
452 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
453 structures it may hold. Also, automatically decode ``controlSequence``
454 of ``PKIResponse``::
455
456     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
457         (
458             ("contentType",),
459             ((("content",), {id_signedData: SignedData()}),),
460         ),
461         (
462             (
463                 "content",
464                 DecodePathDefBy(id_signedData),
465                 "encapContentInfo",
466                 "eContentType",
467             ),
468             ((("eContent",), {
469                 id_cct_PKIData: PKIData(),
470                 id_cct_PKIResponse: PKIResponse(),
471             })),
472         ),
473         (
474             (
475                 "content",
476                 DecodePathDefBy(id_signedData),
477                 "encapContentInfo",
478                 "eContent",
479                 DecodePathDefBy(id_cct_PKIResponse),
480                 "controlSequence",
481                 any,
482                 "attrType",
483             ),
484             ((("attrValues",), {
485                 id_cmc_recipientNonce: RecipientNonce(),
486                 id_cmc_senderNonce: SenderNonce(),
487                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
488                 id_cmc_transactionId: TransactionId(),
489             })),
490         ),
491     )})
492
493 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
494 First function is useful for path construction when some automatic
495 decoding is already done. ``any`` means literally any value it meet --
496 useful for SEQUENCE/SET OF-s.
497
498 .. _bered_ctx:
499
500 BER encoding
501 ------------
502
503 By default PyDERASN accepts only DER encoded data. By default it encodes
504 to DER. But you can optionally enable BER decoding with setting
505 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
506 constructed primitive types should be parsed successfully.
507
508 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
509   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
510   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
511   ``UTCTime``, ``GeneralizedTime`` can contain it.
512 * If object has an indefinite length encoding, then its ``lenindef``
513   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
514   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
515   contain it.
516 * If object has an indefinite length encoded explicit tag, then
517   ``expl_lenindef`` is set to True.
518 * If object has either any of BER-related encoding (explicit tag
519   indefinite length, object's indefinite length, BER-encoding) or any
520   underlying component has that kind of encoding, then ``bered``
521   attribute is set to True. For example SignedData CMS can have
522   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
523   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
524
525 EOC (end-of-contents) token's length is taken in advance in object's
526 value length.
527
528 .. _allow_expl_oob_ctx:
529
530 Allow explicit tag out-of-bound
531 -------------------------------
532
533 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
534 one value, more than one object. If you set ``allow_expl_oob`` context
535 option to True, then no error will be raised and that invalid encoding
536 will be silently further processed. But pay attention that offsets and
537 lengths will be invalid in that case.
538
539 .. warning::
540
541    This option should be used only for skipping some decode errors, just
542    to see the decoded structure somehow.
543
544 .. _streaming:
545
546 Streaming and dealing with huge structures
547 ------------------------------------------
548
549 .. _evgen_mode:
550
551 evgen mode
552 __________
553
554 ASN.1 structures can be huge, they can hold millions of objects inside
555 (for example Certificate Revocation Lists (CRL), holding revocation
556 state for every previously issued X.509 certificate). CACert.org's 8 MiB
557 CRL file takes more than half a gigabyte of memory to hold the decoded
558 structure.
559
560 If you just simply want to check the signature over the ``tbsCertList``,
561 you can create specialized schema with that field represented as
562 OctetString for example::
563
564     class TBSCertListFast(Sequence):
565         schema = (
566             [...]
567             ("revokedCertificates", OctetString(
568                 impl=SequenceOf.tag_default,
569                 optional=True,
570             )),
571             [...]
572         )
573
574 This allows you to quickly decode a few fields and check the signature
575 over the ``tbsCertList`` bytes.
576
577 But how can you get all certificate's serial number from it, after you
578 trust that CRL after signature validation? You can use so called
579 ``evgen`` (event generation) mode, to catch the events/facts of some
580 successful object decoding. Let's use command line capabilities::
581
582     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
583          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
584          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
585          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
586          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
587          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
588          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
589          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
590          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
591     [...]
592         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
593         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
594         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
596         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
597         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
598         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
599         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
600         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
601         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
602     [...]
603     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
604     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
605     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
606       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
607         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
608     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
609     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
610     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
611     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
612         0     [1,4,9145534]  CertificateList SEQUENCE
613
614 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
615 decodes underlying values. It can not tell if SEQUENCE is decoded, so
616 the event of the upper level SEQUENCE is the last one we see.
617 ``version`` field is just a single INTEGER -- it is decoded and event is
618 fired immediately. Then we see that ``algorithm`` and ``parameters``
619 fields are decoded and only after them the ``signature`` SEQUENCE is
620 fired as a successfully decoded. There are 4 events for each revoked
621 certificate entry in that CRL: ``userCertificate`` serial number,
622 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
623 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
624
625 We can do that in our ordinary Python code and understand where we are
626 by looking at deterministically generated decode paths (do not forget
627 about useful ``--print-decode-path`` CLI option). We must use
628 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
629 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
630 obj, tail)`` tuples::
631
632     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
633         if (
634             len(decode_path) == 4 and
635             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
636             decode_path[3] == "userCertificate"
637         ):
638             print("serial number:", int(obj))
639
640 Virtually it does not take any memory except at least needed for single
641 object storage. You can easily use that mode to determine required
642 object ``.offset`` and ``.*len`` to be able to decode it separately, or
643 maybe verify signature upon it just by taking bytes by ``.offset`` and
644 ``.tlvlen``.
645
646 .. _evgen_mode_upto_ctx:
647
648 evgen_mode_upto
649 _______________
650
651 There is full ability to get any kind of data from the CRL in the
652 example above. However it is not too convenient to get the whole
653 ``RevokedCertificate`` structure, that is pretty lightweight and one may
654 do not want to disassemble it. You can use ``evgen_mode_upto``
655 :ref:`ctx <ctx>` option that semantically equals to
656 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
657 mapped to any non-None value. If specified decode path is met, then any
658 subsequent objects won't be decoded in evgen mode. That allows us to
659 parse the CRL above with fully assembled ``RevokedCertificate``::
660
661     for decode_path, obj, _ in CertificateList().decode_evgen(
662         crl_raw,
663         ctx={"evgen_mode_upto": (
664             (("tbsCertList", "revokedCertificates", any), True),
665         )},
666     ):
667         if (
668             len(decode_path) == 3 and
669             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
670         ):
671             print("serial number:", int(obj["userCertificate"]))
672
673 .. note::
674
675    SEQUENCE/SET values with DEFAULT specified are automatically decoded
676    without evgen mode.
677
678 .. _mmap:
679
680 mmap-ed file
681 ____________
682
683 POSIX compliant systems have ``mmap`` syscall, giving ability to work
684 the memory mapped file. You can deal with the file like it was an
685 ordinary binary string, allowing you not to load it to the memory first.
686 Also you can use them as an input for OCTET STRING, taking no Python
687 memory for their storage.
688
689 There is convenient :py:func:`pyderasn.file_mmaped` function that
690 creates read-only memoryview on the file contents::
691
692     with open("huge", "rb") as fd:
693         raw = file_mmaped(fd)
694         obj = Something.decode(raw)
695
696 .. warning::
697
698    mmap-ed files in Python2.7 does not implement buffer protocol, so
699    memoryview won't work on them.
700
701 .. warning::
702
703    mmap maps the **whole** file. So it plays no role if you seek-ed it
704    before. Take the slice of the resulting memoryview with required
705    offset instead.
706
707 .. note::
708
709    If you use ZFS as underlying storage, then pay attention that
710    currently most platforms does not deal good with ZFS ARC and ordinary
711    page cache used for mmaps. It can take twice the necessary size in
712    the memory: both in page cache and ZFS ARC.
713
714 CER encoding
715 ____________
716
717 We can parse any kind of data now, but how can we produce files
718 streamingly, without storing their encoded representation in memory?
719 SEQUENCE by default encodes in memory all its values, joins them in huge
720 binary string, just to know the exact size of SEQUENCE's value for
721 encoding it in TLV. DER requires you to know all exact sizes of the
722 objects.
723
724 You can use CER encoding mode, that slightly differs from the DER, but
725 does not require exact sizes knowledge, allowing streaming encoding
726 directly to some writer/buffer. Just use
727 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
728 encoded data will flow::
729
730     opener = io.open if PY2 else open
731     with opener("result", "wb") as fd:
732         obj.encode_cer(fd.write)
733
734 ::
735
736     buf = io.BytesIO()
737     obj.encode_cer(buf.write)
738
739 If you do not want to create in-memory buffer every time, then you can
740 use :py:func:`pyderasn.encode_cer` function::
741
742     data = encode_cer(obj)
743
744 Remember that CER is **not valid** DER in most cases, so you **have to**
745 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
746 decoding. Also currently there is **no** validation that provided CER is
747 valid one -- you are sure that it has only valid BER encoding.
748
749 .. warning::
750
751    SET OF values can not be streamingly encoded, because they are
752    required to be sorted byte-by-byte. Big SET OF values still will take
753    much memory. Use neither SET nor SET OF values, as modern ASN.1
754    also recommends too.
755
756 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
757 OCTET STRINGs! They will be streamingly copied from underlying file to
758 the buffer using 1 KB chunks.
759
760 Some structures require that some of the elements have to be forcefully
761 DER encoded. For example ``SignedData`` CMS requires you to encode
762 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
763 encode everything else in BER. You can tell any of the structures to be
764 forcefully encoded in DER during CER encoding, by specifying
765 ``der_forced=True`` attribute::
766
767     class Certificate(Sequence):
768         schema = (...)
769         der_forced = True
770
771     class SignedAttributes(SetOf):
772         schema = Attribute()
773         bounds = (1, float("+inf"))
774         der_forced = True
775
776 .. _agg_octet_string:
777
778 agg_octet_string
779 ________________
780
781 In most cases, huge quantity of binary data is stored as OCTET STRING.
782 CER encoding splits it on 1 KB chunks. BER allows splitting on various
783 levels of chunks inclusion::
784
785     SOME STRING[CONSTRUCTED]
786         OCTET STRING[CONSTRUCTED]
787             OCTET STRING[PRIMITIVE]
788                 DATA CHUNK
789             OCTET STRING[PRIMITIVE]
790                 DATA CHUNK
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793         OCTET STRING[PRIMITIVE]
794             DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[PRIMITIVE]
797                 DATA CHUNK
798             OCTET STRING[PRIMITIVE]
799                 DATA CHUNK
800         OCTET STRING[CONSTRUCTED]
801             OCTET STRING[CONSTRUCTED]
802                 OCTET STRING[PRIMITIVE]
803                     DATA CHUNK
804
805 You can not just take the offset and some ``.vlen`` of the STRING and
806 treat it as the payload. If you decode it without
807 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
808 and ``bytes()`` will give the whole payload contents.
809
810 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
811 small memory footprint. There is convenient
812 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
813 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
814 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
815 SHA512 digest of its ``eContent``::
816
817     fd = open("data.p7m", "rb")
818     raw = file_mmaped(fd)
819     ctx = {"bered": True}
820     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
821         if decode_path == ("content",):
822             content = obj
823             break
824     else:
825         raise ValueError("no content found")
826     hasher_state = sha512()
827     def hasher(data):
828         hasher_state.update(data)
829         return len(data)
830     evgens = SignedData().decode_evgen(
831         raw[content.offset:],
832         offset=content.offset,
833         ctx=ctx,
834     )
835     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
836     fd.close()
837     digest = hasher_state.digest()
838
839 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
840 copy the payload (without BER/CER encoding interleaved overhead) in it.
841 Virtually it won't take memory more than for keeping small structures
842 and 1 KB binary chunks.
843
844 .. _seqof-iterators:
845
846 SEQUENCE OF iterators
847 _____________________
848
849 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
850 classes. The only difference with providing the full list of objects, is
851 that type and bounds checking is done during encoding process. Also
852 sequence's value will be emptied after encoding, forcing you to set its
853 value again.
854
855 This is very useful when you have to create some huge objects, like
856 CRLs, with thousands and millions of entities inside. You can write the
857 generator taking necessary data from the database and giving the
858 ``RevokedCertificate`` objects. Only binary representation of that
859 objects will take memory during DER encoding.
860
861 2-pass DER encoding
862 -------------------
863
864 There is ability to do 2-pass encoding to DER, writing results directly
865 to specified writer (buffer, file, whatever). It could be 1.5+ times
866 slower than ordinary encoding, but it takes little memory for 1st pass
867 state storing. For example, 1st pass state for CACert.org's CRL with
868 ~416K of certificate entries takes nearly 3.5 MB of memory.
869 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
870 nearly 0.5 KB of memory.
871
872 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
873 iterators <seqof-iterators>` and write directly to opened file, then
874 there is very small memory footprint.
875
876 1st pass traverses through all the objects of the structure and returns
877 the size of DER encoded structure, together with 1st pass state object.
878 That state contains precalculated lengths for various objects inside the
879 structure.
880
881 ::
882
883     fulllen, state = obj.encode1st()
884
885 2nd pass takes the writer and 1st pass state. It traverses through all
886 the objects again, but writes their encoded representation to the writer.
887
888 ::
889
890     opener = io.open if PY2 else open
891     with opener("result", "wb") as fd:
892         obj.encode2nd(fd.write, iter(state))
893
894 .. warning::
895
896    You **MUST NOT** use 1st pass state if anything is changed in the
897    objects. It is intended to be used immediately after 1st pass is
898    done!
899
900 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
901 have to reinitialize the values after the 1st pass. And you **have to**
902 be sure that the iterator gives exactly the same values as previously.
903 Yes, you have to run your iterator twice -- because this is two pass
904 encoding mode.
905
906 If you want to encode to the memory, then you can use convenient
907 :py:func:`pyderasn.encode2pass` helper.
908
909 .. _browser:
910
911 ASN.1 browser
912 -------------
913 .. autofunction:: pyderasn.browse
914
915 Base Obj
916 --------
917 .. autoclass:: pyderasn.Obj
918    :members:
919
920 Primitive types
921 ---------------
922
923 Boolean
924 _______
925 .. autoclass:: pyderasn.Boolean
926    :members: __init__
927
928 Integer
929 _______
930 .. autoclass:: pyderasn.Integer
931    :members: __init__, named, tohex
932
933 BitString
934 _________
935 .. autoclass:: pyderasn.BitString
936    :members: __init__, bit_len, named
937
938 OctetString
939 ___________
940 .. autoclass:: pyderasn.OctetString
941    :members: __init__
942
943 Null
944 ____
945 .. autoclass:: pyderasn.Null
946    :members: __init__
947
948 ObjectIdentifier
949 ________________
950 .. autoclass:: pyderasn.ObjectIdentifier
951    :members: __init__
952
953 Enumerated
954 __________
955 .. autoclass:: pyderasn.Enumerated
956
957 CommonString
958 ____________
959 .. autoclass:: pyderasn.CommonString
960
961 NumericString
962 _____________
963 .. autoclass:: pyderasn.NumericString
964
965 PrintableString
966 _______________
967 .. autoclass:: pyderasn.PrintableString
968    :members: __init__, allow_asterisk, allow_ampersand
969
970 IA5String
971 _________
972 .. autoclass:: pyderasn.IA5String
973
974 VisibleString
975 _____________
976 .. autoclass:: pyderasn.VisibleString
977
978 UTCTime
979 _______
980 .. autoclass:: pyderasn.UTCTime
981    :members: __init__, todatetime
982
983 GeneralizedTime
984 _______________
985 .. autoclass:: pyderasn.GeneralizedTime
986    :members: __init__, todatetime
987
988 Special types
989 -------------
990
991 Choice
992 ______
993 .. autoclass:: pyderasn.Choice
994    :members: __init__, choice, value
995
996 PrimitiveTypes
997 ______________
998 .. autoclass:: PrimitiveTypes
999
1000 Any
1001 ___
1002 .. autoclass:: pyderasn.Any
1003    :members: __init__
1004
1005 Constructed types
1006 -----------------
1007
1008 Sequence
1009 ________
1010 .. autoclass:: pyderasn.Sequence
1011    :members: __init__
1012
1013 Set
1014 ___
1015 .. autoclass:: pyderasn.Set
1016    :members: __init__
1017
1018 SequenceOf
1019 __________
1020 .. autoclass:: pyderasn.SequenceOf
1021    :members: __init__
1022
1023 SetOf
1024 _____
1025 .. autoclass:: pyderasn.SetOf
1026    :members: __init__
1027
1028 Various
1029 -------
1030
1031 .. autofunction:: pyderasn.abs_decode_path
1032 .. autofunction:: pyderasn.agg_octet_string
1033 .. autofunction:: pyderasn.ascii_visualize
1034 .. autofunction:: pyderasn.colonize_hex
1035 .. autofunction:: pyderasn.encode2pass
1036 .. autofunction:: pyderasn.encode_cer
1037 .. autofunction:: pyderasn.file_mmaped
1038 .. autofunction:: pyderasn.hexenc
1039 .. autofunction:: pyderasn.hexdec
1040 .. autofunction:: pyderasn.hexdump
1041 .. autofunction:: pyderasn.tag_encode
1042 .. autofunction:: pyderasn.tag_decode
1043 .. autofunction:: pyderasn.tag_ctxp
1044 .. autofunction:: pyderasn.tag_ctxc
1045 .. autoclass:: pyderasn.DecodeError
1046    :members: __init__
1047 .. autoclass:: pyderasn.NotEnoughData
1048 .. autoclass:: pyderasn.ExceedingData
1049 .. autoclass:: pyderasn.LenIndefForm
1050 .. autoclass:: pyderasn.TagMismatch
1051 .. autoclass:: pyderasn.InvalidLength
1052 .. autoclass:: pyderasn.InvalidOID
1053 .. autoclass:: pyderasn.ObjUnknown
1054 .. autoclass:: pyderasn.ObjNotReady
1055 .. autoclass:: pyderasn.InvalidValueType
1056 .. autoclass:: pyderasn.BoundsError
1057
1058 .. _cmdline:
1059
1060 Command-line usage
1061 ------------------
1062
1063 You can decode DER/BER files using command line abilities::
1064
1065     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1066
1067 If there is no schema for your file, then you can try parsing it without,
1068 but of course IMPLICIT tags will often make it impossible. But result is
1069 good enough for the certificate above::
1070
1071     $ python -m pyderasn path/to/file
1072         0   [1,3,1604]  . >: SEQUENCE OF
1073         4   [1,3,1453]  . . >: SEQUENCE OF
1074         8   [0,0,   5]  . . . . >: [0] ANY
1075                         . . . . . A0:03:02:01:02
1076        13   [1,1,   3]  . . . . >: INTEGER 61595
1077        18   [1,1,  13]  . . . . >: SEQUENCE OF
1078        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1079        31   [1,1,   0]  . . . . . . >: NULL
1080        33   [1,3, 274]  . . . . >: SEQUENCE OF
1081        37   [1,1,  11]  . . . . . . >: SET OF
1082        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1083        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1084        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1085     [...]
1086      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1087      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1088      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1089                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1090                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1091                         . . . . . . . . . 61:2E:63:6F:6D:2F
1092      1461   [1,1,  13]  . . >: SEQUENCE OF
1093      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1094      1474   [1,1,   0]  . . . . >: NULL
1095      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1096                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1097                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1098     [...]
1099
1100 Human readable OIDs
1101 ___________________
1102
1103 If you have got dictionaries with ObjectIdentifiers, like example one
1104 from ``tests/test_crts.py``::
1105
1106     stroid2name = {
1107         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1108         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1109         [...]
1110         "2.5.4.10": "id-at-organizationName",
1111         "2.5.4.11": "id-at-organizationalUnitName",
1112     }
1113
1114 then you can pass it to pretty printer to see human readable OIDs::
1115
1116     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1117     [...]
1118        37   [1,1,  11]  . . . . . . >: SET OF
1119        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1120        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1121        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1122        50   [1,1,  18]  . . . . . . >: SET OF
1123        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1124        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1125        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1126        70   [1,1,  18]  . . . . . . >: SET OF
1127        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1128        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1129        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1130     [...]
1131
1132 Decode paths
1133 ____________
1134
1135 Each decoded element has so-called decode path: sequence of structure
1136 names it is passing during the decode process. Each element has its own
1137 unique path inside the whole ASN.1 tree. You can print it out with
1138 ``--print-decode-path`` option::
1139
1140     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1141        0    [1,3,1604]  Certificate SEQUENCE []
1142        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1143       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1144       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1145       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1146       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1147       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1148                          . . . . 05:00
1149       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1150       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1151       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1152       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1153       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1154       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1155                          . . . . . . . 13:02:45:53
1156       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1157     [...]
1158
1159 Now you can print only the specified tree, for example signature algorithm::
1160
1161     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1162       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1163       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1164       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1165                          . . 05:00
1166 """
1167
1168 from array import array
1169 from codecs import getdecoder
1170 from codecs import getencoder
1171 from collections import namedtuple
1172 from collections import OrderedDict
1173 from copy import copy
1174 from datetime import datetime
1175 from datetime import timedelta
1176 from io import BytesIO
1177 from math import ceil
1178 from operator import attrgetter
1179 from string import ascii_letters
1180 from string import digits
1181 from sys import maxsize as sys_maxsize
1182 from sys import version_info
1183 from unicodedata import category as unicat
1184
1185 from six import add_metaclass
1186 from six import binary_type
1187 from six import byte2int
1188 from six import indexbytes
1189 from six import int2byte
1190 from six import integer_types
1191 from six import iterbytes
1192 from six import iteritems
1193 from six import itervalues
1194 from six import PY2
1195 from six import string_types
1196 from six import text_type
1197 from six import unichr as six_unichr
1198 from six.moves import xrange as six_xrange
1199
1200
1201 try:
1202     from termcolor import colored
1203 except ImportError:  # pragma: no cover
1204     def colored(what, *args, **kwargs):
1205         return what
1206
1207 __version__ = "8.2"
1208
1209 __all__ = (
1210     "agg_octet_string",
1211     "Any",
1212     "BitString",
1213     "BMPString",
1214     "Boolean",
1215     "BoundsError",
1216     "Choice",
1217     "colonize_hex",
1218     "DecodeError",
1219     "DecodePathDefBy",
1220     "encode2pass",
1221     "encode_cer",
1222     "Enumerated",
1223     "ExceedingData",
1224     "file_mmaped",
1225     "GeneralizedTime",
1226     "GeneralString",
1227     "GraphicString",
1228     "hexdec",
1229     "hexenc",
1230     "IA5String",
1231     "Integer",
1232     "InvalidLength",
1233     "InvalidOID",
1234     "InvalidValueType",
1235     "ISO646String",
1236     "LenIndefForm",
1237     "NotEnoughData",
1238     "Null",
1239     "NumericString",
1240     "obj_by_path",
1241     "ObjectIdentifier",
1242     "ObjNotReady",
1243     "ObjUnknown",
1244     "OctetString",
1245     "PrimitiveTypes",
1246     "PrintableString",
1247     "Sequence",
1248     "SequenceOf",
1249     "Set",
1250     "SetOf",
1251     "T61String",
1252     "tag_ctxc",
1253     "tag_ctxp",
1254     "tag_decode",
1255     "TagClassApplication",
1256     "TagClassContext",
1257     "TagClassPrivate",
1258     "TagClassUniversal",
1259     "TagFormConstructed",
1260     "TagFormPrimitive",
1261     "TagMismatch",
1262     "TeletexString",
1263     "UniversalString",
1264     "UTCTime",
1265     "UTF8String",
1266     "VideotexString",
1267     "VisibleString",
1268 )
1269
1270 TagClassUniversal = 0
1271 TagClassApplication = 1 << 6
1272 TagClassContext = 1 << 7
1273 TagClassPrivate = 1 << 6 | 1 << 7
1274 TagFormPrimitive = 0
1275 TagFormConstructed = 1 << 5
1276 TagClassReprs = {
1277     TagClassContext: "",
1278     TagClassApplication: "APPLICATION ",
1279     TagClassPrivate: "PRIVATE ",
1280     TagClassUniversal: "UNIV ",
1281 }
1282 EOC = b"\x00\x00"
1283 EOC_LEN = len(EOC)
1284 LENINDEF = b"\x80"  # length indefinite mark
1285 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1286 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1287 SET01 = frozenset("01")
1288 DECIMALS = frozenset(digits)
1289 DECIMAL_SIGNS = ".,"
1290 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1291
1292
1293 def file_mmaped(fd):
1294     """Make mmap-ed memoryview for reading from file
1295
1296     :param fd: file object
1297     :returns: memoryview over read-only mmap-ing of the whole file
1298
1299     .. warning::
1300
1301        It is known to work under neither Python 2.x nor Windows.
1302     """
1303     import mmap
1304     return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ))
1305
1306
1307 def pureint(value):
1308     if not set(value) <= DECIMALS:
1309         raise ValueError("non-pure integer")
1310     return int(value)
1311
1312
1313 def fractions2float(fractions_raw):
1314     pureint(fractions_raw)
1315     return float("0." + fractions_raw)
1316
1317
1318 def get_def_by_path(defines_by_path, sub_decode_path):
1319     """Get define by decode path
1320     """
1321     for path, define in defines_by_path:
1322         if len(path) != len(sub_decode_path):
1323             continue
1324         for p1, p2 in zip(path, sub_decode_path):
1325             if (p1 is not any) and (p1 != p2):
1326                 break
1327         else:
1328             return define
1329
1330
1331 ########################################################################
1332 # Errors
1333 ########################################################################
1334
1335 class ASN1Error(ValueError):
1336     pass
1337
1338
1339 class DecodeError(ASN1Error):
1340     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1341         """
1342         :param str msg: reason of decode failing
1343         :param klass: optional exact DecodeError inherited class (like
1344                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1345                       :py:exc:`InvalidLength`)
1346         :param decode_path: tuple of strings. It contains human
1347                             readable names of the fields through which
1348                             decoding process has passed
1349         :param int offset: binary offset where failure happened
1350         """
1351         super(DecodeError, self).__init__()
1352         self.msg = msg
1353         self.klass = klass
1354         self.decode_path = decode_path
1355         self.offset = offset
1356
1357     def __str__(self):
1358         return " ".join(
1359             c for c in (
1360                 "" if self.klass is None else self.klass.__name__,
1361                 (
1362                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1363                     if len(self.decode_path) > 0 else ""
1364                 ),
1365                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1366                 self.msg,
1367             ) if c != ""
1368         )
1369
1370     def __repr__(self):
1371         return "%s(%s)" % (self.__class__.__name__, self)
1372
1373
1374 class NotEnoughData(DecodeError):
1375     pass
1376
1377
1378 class ExceedingData(ASN1Error):
1379     def __init__(self, nbytes):
1380         super(ExceedingData, self).__init__()
1381         self.nbytes = nbytes
1382
1383     def __str__(self):
1384         return "%d trailing bytes" % self.nbytes
1385
1386     def __repr__(self):
1387         return "%s(%s)" % (self.__class__.__name__, self)
1388
1389
1390 class LenIndefForm(DecodeError):
1391     pass
1392
1393
1394 class TagMismatch(DecodeError):
1395     pass
1396
1397
1398 class InvalidLength(DecodeError):
1399     pass
1400
1401
1402 class InvalidOID(DecodeError):
1403     pass
1404
1405
1406 class ObjUnknown(ASN1Error):
1407     def __init__(self, name):
1408         super(ObjUnknown, self).__init__()
1409         self.name = name
1410
1411     def __str__(self):
1412         return "object is unknown: %s" % self.name
1413
1414     def __repr__(self):
1415         return "%s(%s)" % (self.__class__.__name__, self)
1416
1417
1418 class ObjNotReady(ASN1Error):
1419     def __init__(self, name):
1420         super(ObjNotReady, self).__init__()
1421         self.name = name
1422
1423     def __str__(self):
1424         return "object is not ready: %s" % self.name
1425
1426     def __repr__(self):
1427         return "%s(%s)" % (self.__class__.__name__, self)
1428
1429
1430 class InvalidValueType(ASN1Error):
1431     def __init__(self, expected_types):
1432         super(InvalidValueType, self).__init__()
1433         self.expected_types = expected_types
1434
1435     def __str__(self):
1436         return "invalid value type, expected: %s" % ", ".join(
1437             [repr(t) for t in self.expected_types]
1438         )
1439
1440     def __repr__(self):
1441         return "%s(%s)" % (self.__class__.__name__, self)
1442
1443
1444 class BoundsError(ASN1Error):
1445     def __init__(self, bound_min, value, bound_max):
1446         super(BoundsError, self).__init__()
1447         self.bound_min = bound_min
1448         self.value = value
1449         self.bound_max = bound_max
1450
1451     def __str__(self):
1452         return "unsatisfied bounds: %s <= %s <= %s" % (
1453             self.bound_min,
1454             self.value,
1455             self.bound_max,
1456         )
1457
1458     def __repr__(self):
1459         return "%s(%s)" % (self.__class__.__name__, self)
1460
1461
1462 ########################################################################
1463 # Basic coders
1464 ########################################################################
1465
1466 _hexdecoder = getdecoder("hex")
1467 _hexencoder = getencoder("hex")
1468
1469
1470 def hexdec(data):
1471     """Binary data to hexadecimal string convert
1472     """
1473     return _hexdecoder(data)[0]
1474
1475
1476 def hexenc(data):
1477     """Hexadecimal string to binary data convert
1478     """
1479     return _hexencoder(data)[0].decode("ascii")
1480
1481
1482 def int_bytes_len(num, byte_len=8):
1483     if num == 0:
1484         return 1
1485     return int(ceil(float(num.bit_length()) / byte_len))
1486
1487
1488 def zero_ended_encode(num):
1489     octets = bytearray(int_bytes_len(num, 7))
1490     i = len(octets) - 1
1491     octets[i] = num & 0x7F
1492     num >>= 7
1493     i -= 1
1494     while num > 0:
1495         octets[i] = 0x80 | (num & 0x7F)
1496         num >>= 7
1497         i -= 1
1498     return bytes(octets)
1499
1500
1501 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1502     """Encode tag to binary form
1503
1504     :param int num: tag's number
1505     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1506                       :py:data:`pyderasn.TagClassContext`,
1507                       :py:data:`pyderasn.TagClassApplication`,
1508                       :py:data:`pyderasn.TagClassPrivate`)
1509     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1510                      :py:data:`pyderasn.TagFormConstructed`)
1511     """
1512     if num < 31:
1513         # [XX|X|.....]
1514         return int2byte(klass | form | num)
1515     # [XX|X|11111][1.......][1.......] ... [0.......]
1516     return int2byte(klass | form | 31) + zero_ended_encode(num)
1517
1518
1519 def tag_decode(tag):
1520     """Decode tag from binary form
1521
1522     .. warning::
1523
1524        No validation is performed, assuming that it has already passed.
1525
1526     It returns tuple with three integers, as
1527     :py:func:`pyderasn.tag_encode` accepts.
1528     """
1529     first_octet = byte2int(tag)
1530     klass = first_octet & 0xC0
1531     form = first_octet & 0x20
1532     if first_octet & 0x1F < 0x1F:
1533         return (klass, form, first_octet & 0x1F)
1534     num = 0
1535     for octet in iterbytes(tag[1:]):
1536         num <<= 7
1537         num |= octet & 0x7F
1538     return (klass, form, num)
1539
1540
1541 def tag_ctxp(num):
1542     """Create CONTEXT PRIMITIVE tag
1543     """
1544     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1545
1546
1547 def tag_ctxc(num):
1548     """Create CONTEXT CONSTRUCTED tag
1549     """
1550     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1551
1552
1553 def tag_strip(data):
1554     """Take off tag from the data
1555
1556     :returns: (encoded tag, tag length, remaining data)
1557     """
1558     if len(data) == 0:
1559         raise NotEnoughData("no data at all")
1560     if byte2int(data) & 0x1F < 31:
1561         return data[:1], 1, data[1:]
1562     i = 0
1563     while True:
1564         i += 1
1565         if i == len(data):
1566             raise DecodeError("unfinished tag")
1567         if indexbytes(data, i) & 0x80 == 0:
1568             break
1569     if i == 1 and indexbytes(data, 1) < 0x1F:
1570         raise DecodeError("unexpected long form")
1571     if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1572         raise DecodeError("leading zero byte in tag value")
1573     i += 1
1574     return data[:i], i, data[i:]
1575
1576
1577 def len_encode(l):
1578     if l < 0x80:
1579         return int2byte(l)
1580     octets = bytearray(int_bytes_len(l) + 1)
1581     octets[0] = 0x80 | (len(octets) - 1)
1582     for i in six_xrange(len(octets) - 1, 0, -1):
1583         octets[i] = l & 0xFF
1584         l >>= 8
1585     return bytes(octets)
1586
1587
1588 def len_decode(data):
1589     """Decode length
1590
1591     :returns: (decoded length, length's length, remaining data)
1592     :raises LenIndefForm: if indefinite form encoding is met
1593     """
1594     if len(data) == 0:
1595         raise NotEnoughData("no data at all")
1596     first_octet = byte2int(data)
1597     if first_octet & 0x80 == 0:
1598         return first_octet, 1, data[1:]
1599     octets_num = first_octet & 0x7F
1600     if octets_num + 1 > len(data):
1601         raise NotEnoughData("encoded length is longer than data")
1602     if octets_num == 0:
1603         raise LenIndefForm()
1604     if byte2int(data[1:]) == 0:
1605         raise DecodeError("leading zeros")
1606     l = 0
1607     for v in iterbytes(data[1:1 + octets_num]):
1608         l = (l << 8) | v
1609     if l <= 127:
1610         raise DecodeError("long form instead of short one")
1611     return l, 1 + octets_num, data[1 + octets_num:]
1612
1613
1614 LEN0 = len_encode(0)
1615 LEN1 = len_encode(1)
1616 LEN1K = len_encode(1000)
1617
1618
1619 def len_size(l):
1620     """How many bytes length field will take
1621     """
1622     if l < 128:
1623         return 1
1624     if l < 256:  # 1 << 8
1625         return 2
1626     if l < 65536:  # 1 << 16
1627         return 3
1628     if l < 16777216:  # 1 << 24
1629         return 4
1630     if l < 4294967296:  # 1 << 32
1631         return 5
1632     if l < 1099511627776:  # 1 << 40
1633         return 6
1634     if l < 281474976710656:  # 1 << 48
1635         return 7
1636     if l < 72057594037927936:  # 1 << 56
1637         return 8
1638     raise OverflowError("too big length")
1639
1640
1641 def write_full(writer, data):
1642     """Fully write provided data
1643
1644     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1645
1646     BytesIO does not guarantee that the whole data will be written at
1647     once. That function write everything provided, raising an error if
1648     ``writer`` returns None.
1649     """
1650     data = memoryview(data)
1651     written = 0
1652     while written != len(data):
1653         n = writer(data[written:])
1654         if n is None:
1655             raise ValueError("can not write to buf")
1656         written += n
1657
1658
1659 # If it is 64-bit system, then use compact 64-bit array of unsigned
1660 # longs. Use an ordinary list with universal integers otherwise, that
1661 # is slower.
1662 if sys_maxsize > 2 ** 32:
1663     def state_2pass_new():
1664         return array("L")
1665 else:
1666     def state_2pass_new():
1667         return []
1668
1669
1670 ########################################################################
1671 # Base class
1672 ########################################################################
1673
1674 class AutoAddSlots(type):
1675     def __new__(cls, name, bases, _dict):
1676         _dict["__slots__"] = _dict.get("__slots__", ())
1677         return type.__new__(cls, name, bases, _dict)
1678
1679
1680 BasicState = namedtuple("BasicState", (
1681     "version",
1682     "tag",
1683     "tag_order",
1684     "expl",
1685     "default",
1686     "optional",
1687     "offset",
1688     "llen",
1689     "vlen",
1690     "expl_lenindef",
1691     "lenindef",
1692     "ber_encoded",
1693 ), **NAMEDTUPLE_KWARGS)
1694
1695
1696 @add_metaclass(AutoAddSlots)
1697 class Obj(object):
1698     """Common ASN.1 object class
1699
1700     All ASN.1 types are inherited from it. It has metaclass that
1701     automatically adds ``__slots__`` to all inherited classes.
1702     """
1703     __slots__ = (
1704         "tag",
1705         "_tag_order",
1706         "_value",
1707         "_expl",
1708         "default",
1709         "optional",
1710         "offset",
1711         "llen",
1712         "vlen",
1713         "expl_lenindef",
1714         "lenindef",
1715         "ber_encoded",
1716     )
1717
1718     def __init__(
1719             self,
1720             impl=None,
1721             expl=None,
1722             default=None,
1723             optional=False,
1724             _decoded=(0, 0, 0),
1725     ):
1726         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1727         self._expl = getattr(self, "expl", None) if expl is None else expl
1728         if self.tag != self.tag_default and self._expl is not None:
1729             raise ValueError("implicit and explicit tags can not be set simultaneously")
1730         if self.tag is None:
1731             self._tag_order = None
1732         else:
1733             tag_class, _, tag_num = tag_decode(
1734                 self.tag if self._expl is None else self._expl
1735             )
1736             self._tag_order = (tag_class, tag_num)
1737         if default is not None:
1738             optional = True
1739         self.optional = optional
1740         self.offset, self.llen, self.vlen = _decoded
1741         self.default = None
1742         self.expl_lenindef = False
1743         self.lenindef = False
1744         self.ber_encoded = False
1745
1746     @property
1747     def ready(self):  # pragma: no cover
1748         """Is object ready to be encoded?
1749         """
1750         raise NotImplementedError()
1751
1752     def _assert_ready(self):
1753         if not self.ready:
1754             raise ObjNotReady(self.__class__.__name__)
1755
1756     @property
1757     def bered(self):
1758         """Is either object or any elements inside is BER encoded?
1759         """
1760         return self.expl_lenindef or self.lenindef or self.ber_encoded
1761
1762     @property
1763     def decoded(self):
1764         """Is object decoded?
1765         """
1766         return (self.llen + self.vlen) > 0
1767
1768     def __getstate__(self):  # pragma: no cover
1769         """Used for making safe to be mutable pickleable copies
1770         """
1771         raise NotImplementedError()
1772
1773     def __setstate__(self, state):
1774         if state.version != __version__:
1775             raise ValueError("data is pickled by different PyDERASN version")
1776         self.tag = state.tag
1777         self._tag_order = state.tag_order
1778         self._expl = state.expl
1779         self.default = state.default
1780         self.optional = state.optional
1781         self.offset = state.offset
1782         self.llen = state.llen
1783         self.vlen = state.vlen
1784         self.expl_lenindef = state.expl_lenindef
1785         self.lenindef = state.lenindef
1786         self.ber_encoded = state.ber_encoded
1787
1788     @property
1789     def tag_order(self):
1790         """Tag's (class, number) used for DER/CER sorting
1791         """
1792         return self._tag_order
1793
1794     @property
1795     def tag_order_cer(self):
1796         return self.tag_order
1797
1798     @property
1799     def tlen(self):
1800         """.. seealso:: :ref:`decoding`
1801         """
1802         return len(self.tag)
1803
1804     @property
1805     def tlvlen(self):
1806         """.. seealso:: :ref:`decoding`
1807         """
1808         return self.tlen + self.llen + self.vlen
1809
1810     def __str__(self):  # pragma: no cover
1811         return self.__bytes__() if PY2 else self.__unicode__()
1812
1813     def __ne__(self, their):
1814         return not(self == their)
1815
1816     def __gt__(self, their):  # pragma: no cover
1817         return not(self < their)
1818
1819     def __le__(self, their):  # pragma: no cover
1820         return (self == their) or (self < their)
1821
1822     def __ge__(self, their):  # pragma: no cover
1823         return (self == their) or (self > their)
1824
1825     def _encode(self):  # pragma: no cover
1826         raise NotImplementedError()
1827
1828     def _encode_cer(self, writer):
1829         write_full(writer, self._encode())
1830
1831     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1832         yield NotImplemented
1833
1834     def _encode1st(self, state):
1835         raise NotImplementedError()
1836
1837     def _encode2nd(self, writer, state_iter):
1838         raise NotImplementedError()
1839
1840     def encode(self):
1841         """DER encode the structure
1842
1843         :returns: DER representation
1844         """
1845         raw = self._encode()
1846         if self._expl is None:
1847             return raw
1848         return b"".join((self._expl, len_encode(len(raw)), raw))
1849
1850     def encode1st(self, state=None):
1851         """Do the 1st pass of 2-pass encoding
1852
1853         :rtype: (int, array("L"))
1854         :returns: full length of encoded data and precalculated various
1855                   objects lengths
1856         """
1857         if state is None:
1858             state = state_2pass_new()
1859         if self._expl is None:
1860             return self._encode1st(state)
1861         state.append(0)
1862         idx = len(state) - 1
1863         vlen, _ = self._encode1st(state)
1864         state[idx] = vlen
1865         fulllen = len(self._expl) + len_size(vlen) + vlen
1866         return fulllen, state
1867
1868     def encode2nd(self, writer, state_iter):
1869         """Do the 2nd pass of 2-pass encoding
1870
1871         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1872         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1873         """
1874         if self._expl is None:
1875             self._encode2nd(writer, state_iter)
1876         else:
1877             write_full(writer, self._expl + len_encode(next(state_iter)))
1878             self._encode2nd(writer, state_iter)
1879
1880     def encode_cer(self, writer):
1881         """CER encode the structure to specified writer
1882
1883         :param writer: must comply with ``io.RawIOBase.write``
1884                        behaviour. It takes slice to be written and
1885                        returns number of bytes processed. If it returns
1886                        None, then exception will be raised
1887         """
1888         if self._expl is not None:
1889             write_full(writer, self._expl + LENINDEF)
1890         if getattr(self, "der_forced", False):
1891             write_full(writer, self._encode())
1892         else:
1893             self._encode_cer(writer)
1894         if self._expl is not None:
1895             write_full(writer, EOC)
1896
1897     def hexencode(self):
1898         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1899         """
1900         return hexenc(self.encode())
1901
1902     def decode(
1903             self,
1904             data,
1905             offset=0,
1906             leavemm=False,
1907             decode_path=(),
1908             ctx=None,
1909             tag_only=False,
1910             _ctx_immutable=True,
1911     ):
1912         """Decode the data
1913
1914         :param data: either binary or memoryview
1915         :param int offset: initial data's offset
1916         :param bool leavemm: do we need to leave memoryview of remaining
1917                     data as is, or convert it to bytes otherwise
1918         :param decode_path: current decode path (tuples of strings,
1919                             possibly with DecodePathDefBy) with will be
1920                             the root for all underlying objects
1921         :param ctx: optional :ref:`context <ctx>` governing decoding process
1922         :param bool tag_only: decode only the tag, without length and
1923                               contents (used only in Choice and Set
1924                               structures, trying to determine if tag satisfies
1925                               the schema)
1926         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1927                                     before using it?
1928         :returns: (Obj, remaining data)
1929
1930         .. seealso:: :ref:`decoding`
1931         """
1932         result = next(self.decode_evgen(
1933             data,
1934             offset,
1935             leavemm,
1936             decode_path,
1937             ctx,
1938             tag_only,
1939             _ctx_immutable,
1940             _evgen_mode=False,
1941         ))
1942         if result is None:
1943             return None
1944         _, obj, tail = result
1945         return obj, tail
1946
1947     def decode_evgen(
1948             self,
1949             data,
1950             offset=0,
1951             leavemm=False,
1952             decode_path=(),
1953             ctx=None,
1954             tag_only=False,
1955             _ctx_immutable=True,
1956             _evgen_mode=True,
1957     ):
1958         """Decode with evgen mode on
1959
1960         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1961         it returns the generator producing ``(decode_path, obj, tail)``
1962         values.
1963         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1964         """
1965         if ctx is None:
1966             ctx = {}
1967         elif _ctx_immutable:
1968             ctx = copy(ctx)
1969         tlv = memoryview(data)
1970         if (
1971                 _evgen_mode and
1972                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1973         ):
1974             _evgen_mode = False
1975         if self._expl is None:
1976             for result in self._decode(
1977                     tlv,
1978                     offset=offset,
1979                     decode_path=decode_path,
1980                     ctx=ctx,
1981                     tag_only=tag_only,
1982                     evgen_mode=_evgen_mode,
1983             ):
1984                 if tag_only:
1985                     yield None
1986                     return
1987                 _decode_path, obj, tail = result
1988                 if _decode_path is not decode_path:
1989                     yield result
1990         else:
1991             try:
1992                 t, tlen, lv = tag_strip(tlv)
1993             except DecodeError as err:
1994                 raise err.__class__(
1995                     msg=err.msg,
1996                     klass=self.__class__,
1997                     decode_path=decode_path,
1998                     offset=offset,
1999                 )
2000             if t != self._expl:
2001                 raise TagMismatch(
2002                     klass=self.__class__,
2003                     decode_path=decode_path,
2004                     offset=offset,
2005                 )
2006             try:
2007                 l, llen, v = len_decode(lv)
2008             except LenIndefForm as err:
2009                 if not ctx.get("bered", False):
2010                     raise err.__class__(
2011                         msg=err.msg,
2012                         klass=self.__class__,
2013                         decode_path=decode_path,
2014                         offset=offset,
2015                     )
2016                 llen, v = 1, lv[1:]
2017                 offset += tlen + llen
2018                 for result in self._decode(
2019                         v,
2020                         offset=offset,
2021                         decode_path=decode_path,
2022                         ctx=ctx,
2023                         tag_only=tag_only,
2024                         evgen_mode=_evgen_mode,
2025                 ):
2026                     if tag_only:  # pragma: no cover
2027                         yield None
2028                         return
2029                     _decode_path, obj, tail = result
2030                     if _decode_path is not decode_path:
2031                         yield result
2032                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2033                 if eoc_expected.tobytes() != EOC:
2034                     raise DecodeError(
2035                         "no EOC",
2036                         klass=self.__class__,
2037                         decode_path=decode_path,
2038                         offset=offset,
2039                     )
2040                 obj.vlen += EOC_LEN
2041                 obj.expl_lenindef = True
2042             except DecodeError as err:
2043                 raise err.__class__(
2044                     msg=err.msg,
2045                     klass=self.__class__,
2046                     decode_path=decode_path,
2047                     offset=offset,
2048                 )
2049             else:
2050                 if l > len(v):
2051                     raise NotEnoughData(
2052                         "encoded length is longer than data",
2053                         klass=self.__class__,
2054                         decode_path=decode_path,
2055                         offset=offset,
2056                     )
2057                 for result in self._decode(
2058                         v,
2059                         offset=offset + tlen + llen,
2060                         decode_path=decode_path,
2061                         ctx=ctx,
2062                         tag_only=tag_only,
2063                         evgen_mode=_evgen_mode,
2064                 ):
2065                     if tag_only:  # pragma: no cover
2066                         yield None
2067                         return
2068                     _decode_path, obj, tail = result
2069                     if _decode_path is not decode_path:
2070                         yield result
2071                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2072                     raise DecodeError(
2073                         "explicit tag out-of-bound, longer than data",
2074                         klass=self.__class__,
2075                         decode_path=decode_path,
2076                         offset=offset,
2077                     )
2078         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2079
2080     def decod(self, data, offset=0, decode_path=(), ctx=None):
2081         """Decode the data, check that tail is empty
2082
2083         :raises ExceedingData: if tail is not empty
2084
2085         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2086         (decode without tail) that also checks that there is no
2087         trailing data left.
2088         """
2089         obj, tail = self.decode(
2090             data,
2091             offset=offset,
2092             decode_path=decode_path,
2093             ctx=ctx,
2094             leavemm=True,
2095         )
2096         if len(tail) > 0:
2097             raise ExceedingData(len(tail))
2098         return obj
2099
2100     def hexdecode(self, data, *args, **kwargs):
2101         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2102         """
2103         return self.decode(hexdec(data), *args, **kwargs)
2104
2105     def hexdecod(self, data, *args, **kwargs):
2106         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2107         """
2108         return self.decod(hexdec(data), *args, **kwargs)
2109
2110     @property
2111     def expled(self):
2112         """.. seealso:: :ref:`decoding`
2113         """
2114         return self._expl is not None
2115
2116     @property
2117     def expl_tag(self):
2118         """.. seealso:: :ref:`decoding`
2119         """
2120         return self._expl
2121
2122     @property
2123     def expl_tlen(self):
2124         """.. seealso:: :ref:`decoding`
2125         """
2126         return len(self._expl)
2127
2128     @property
2129     def expl_llen(self):
2130         """.. seealso:: :ref:`decoding`
2131         """
2132         if self.expl_lenindef:
2133             return 1
2134         return len(len_encode(self.tlvlen))
2135
2136     @property
2137     def expl_offset(self):
2138         """.. seealso:: :ref:`decoding`
2139         """
2140         return self.offset - self.expl_tlen - self.expl_llen
2141
2142     @property
2143     def expl_vlen(self):
2144         """.. seealso:: :ref:`decoding`
2145         """
2146         return self.tlvlen
2147
2148     @property
2149     def expl_tlvlen(self):
2150         """.. seealso:: :ref:`decoding`
2151         """
2152         return self.expl_tlen + self.expl_llen + self.expl_vlen
2153
2154     @property
2155     def fulloffset(self):
2156         """.. seealso:: :ref:`decoding`
2157         """
2158         return self.expl_offset if self.expled else self.offset
2159
2160     @property
2161     def fulllen(self):
2162         """.. seealso:: :ref:`decoding`
2163         """
2164         return self.expl_tlvlen if self.expled else self.tlvlen
2165
2166     def pps_lenindef(self, decode_path):
2167         if self.lenindef and not (
2168                 getattr(self, "defined", None) is not None and
2169                 self.defined[1].lenindef
2170         ):
2171             yield _pp(
2172                 asn1_type_name="EOC",
2173                 obj_name="",
2174                 decode_path=decode_path,
2175                 offset=(
2176                     self.offset + self.tlvlen -
2177                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2178                 ),
2179                 tlen=1,
2180                 llen=1,
2181                 vlen=0,
2182                 ber_encoded=True,
2183                 bered=True,
2184             )
2185         if self.expl_lenindef:
2186             yield _pp(
2187                 asn1_type_name="EOC",
2188                 obj_name="EXPLICIT",
2189                 decode_path=decode_path,
2190                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2191                 tlen=1,
2192                 llen=1,
2193                 vlen=0,
2194                 ber_encoded=True,
2195                 bered=True,
2196             )
2197
2198
2199 def encode_cer(obj):
2200     """Encode to CER in memory buffer
2201
2202     :returns bytes: memory buffer contents
2203     """
2204     buf = BytesIO()
2205     obj.encode_cer(buf.write)
2206     return buf.getvalue()
2207
2208
2209 def encode2pass(obj):
2210     """Encode (2-pass mode) to DER in memory buffer
2211
2212     :returns bytes: memory buffer contents
2213     """
2214     buf = BytesIO()
2215     _, state = obj.encode1st()
2216     obj.encode2nd(buf.write, iter(state))
2217     return buf.getvalue()
2218
2219
2220 class DecodePathDefBy(object):
2221     """DEFINED BY representation inside decode path
2222     """
2223     __slots__ = ("defined_by",)
2224
2225     def __init__(self, defined_by):
2226         self.defined_by = defined_by
2227
2228     def __ne__(self, their):
2229         return not(self == their)
2230
2231     def __eq__(self, their):
2232         if not isinstance(their, self.__class__):
2233             return False
2234         return self.defined_by == their.defined_by
2235
2236     def __str__(self):
2237         return "DEFINED BY " + str(self.defined_by)
2238
2239     def __repr__(self):
2240         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2241
2242
2243 ########################################################################
2244 # Pretty printing
2245 ########################################################################
2246
2247 PP = namedtuple("PP", (
2248     "obj",
2249     "asn1_type_name",
2250     "obj_name",
2251     "decode_path",
2252     "value",
2253     "blob",
2254     "optional",
2255     "default",
2256     "impl",
2257     "expl",
2258     "offset",
2259     "tlen",
2260     "llen",
2261     "vlen",
2262     "expl_offset",
2263     "expl_tlen",
2264     "expl_llen",
2265     "expl_vlen",
2266     "expl_lenindef",
2267     "lenindef",
2268     "ber_encoded",
2269     "bered",
2270 ), **NAMEDTUPLE_KWARGS)
2271
2272
2273 def _pp(
2274         obj=None,
2275         asn1_type_name="unknown",
2276         obj_name="unknown",
2277         decode_path=(),
2278         value=None,
2279         blob=None,
2280         optional=False,
2281         default=False,
2282         impl=None,
2283         expl=None,
2284         offset=0,
2285         tlen=0,
2286         llen=0,
2287         vlen=0,
2288         expl_offset=None,
2289         expl_tlen=None,
2290         expl_llen=None,
2291         expl_vlen=None,
2292         expl_lenindef=False,
2293         lenindef=False,
2294         ber_encoded=False,
2295         bered=False,
2296 ):
2297     return PP(
2298         obj,
2299         asn1_type_name,
2300         obj_name,
2301         decode_path,
2302         value,
2303         blob,
2304         optional,
2305         default,
2306         impl,
2307         expl,
2308         offset,
2309         tlen,
2310         llen,
2311         vlen,
2312         expl_offset,
2313         expl_tlen,
2314         expl_llen,
2315         expl_vlen,
2316         expl_lenindef,
2317         lenindef,
2318         ber_encoded,
2319         bered,
2320     )
2321
2322
2323 def _colourize(what, colour, with_colours, attrs=("bold",)):
2324     return colored(what, colour, attrs=attrs) if with_colours else what
2325
2326
2327 def colonize_hex(hexed):
2328     """Separate hexadecimal string with colons
2329     """
2330     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2331
2332
2333 def find_oid_name(asn1_type_name, oid_maps, value):
2334     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2335         for oid_map in oid_maps:
2336             oid_name = oid_map.get(value)
2337             if oid_name is not None:
2338                 return oid_name
2339     return None
2340
2341
2342 def pp_console_row(
2343         pp,
2344         oid_maps=(),
2345         with_offsets=False,
2346         with_blob=True,
2347         with_colours=False,
2348         with_decode_path=False,
2349         decode_path_len_decrease=0,
2350 ):
2351     cols = []
2352     if with_offsets:
2353         col = "%5d%s%s" % (
2354             pp.offset,
2355             (
2356                 "  " if pp.expl_offset is None else
2357                 ("-%d" % (pp.offset - pp.expl_offset))
2358             ),
2359             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2360         )
2361         col = _colourize(col, "red", with_colours, ())
2362         col += _colourize("B", "red", with_colours) if pp.bered else " "
2363         cols.append(col)
2364         col = "[%d,%d,%4d]%s" % (
2365             pp.tlen, pp.llen, pp.vlen,
2366             LENINDEF_PP_CHAR if pp.lenindef else " "
2367         )
2368         col = _colourize(col, "green", with_colours, ())
2369         cols.append(col)
2370     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2371     if decode_path_len > 0:
2372         cols.append(" ." * decode_path_len)
2373         ent = pp.decode_path[-1]
2374         if isinstance(ent, DecodePathDefBy):
2375             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2376             value = str(ent.defined_by)
2377             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2378             if oid_name is None:
2379                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2380             else:
2381                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2382         else:
2383             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2384     if pp.expl is not None:
2385         klass, _, num = pp.expl
2386         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2387         cols.append(_colourize(col, "blue", with_colours))
2388     if pp.impl is not None:
2389         klass, _, num = pp.impl
2390         col = "[%s%d]" % (TagClassReprs[klass], num)
2391         cols.append(_colourize(col, "blue", with_colours))
2392     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2393         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2394     if pp.ber_encoded:
2395         cols.append(_colourize("BER", "red", with_colours))
2396     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2397     if pp.value is not None:
2398         value = pp.value
2399         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2400         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2401         if oid_name is not None:
2402             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2403         if pp.asn1_type_name == Integer.asn1_type_name:
2404             cols.append(_colourize(
2405                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2406             ))
2407     if with_blob:
2408         if pp.blob.__class__ == binary_type:
2409             cols.append(hexenc(pp.blob))
2410         elif pp.blob.__class__ == tuple:
2411             cols.append(", ".join(pp.blob))
2412     if pp.optional:
2413         cols.append(_colourize("OPTIONAL", "red", with_colours))
2414     if pp.default:
2415         cols.append(_colourize("DEFAULT", "red", with_colours))
2416     if with_decode_path:
2417         cols.append(_colourize(
2418             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2419             "grey",
2420             with_colours,
2421         ))
2422     return " ".join(cols)
2423
2424
2425 def pp_console_blob(pp, decode_path_len_decrease=0):
2426     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2427     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2428     if decode_path_len > 0:
2429         cols.append(" ." * (decode_path_len + 1))
2430     if pp.blob.__class__ == binary_type:
2431         blob = hexenc(pp.blob).upper()
2432         for i in six_xrange(0, len(blob), 32):
2433             chunk = blob[i:i + 32]
2434             yield " ".join(cols + [colonize_hex(chunk)])
2435     elif pp.blob.__class__ == tuple:
2436         yield " ".join(cols + [", ".join(pp.blob)])
2437
2438
2439 def pprint(
2440         obj,
2441         oid_maps=(),
2442         big_blobs=False,
2443         with_colours=False,
2444         with_decode_path=False,
2445         decode_path_only=(),
2446         decode_path=(),
2447 ):
2448     """Pretty print object
2449
2450     :param Obj obj: object you want to pretty print
2451     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2452                      Its human readable form is printed when OID is met
2453     :param big_blobs: if large binary objects are met (like OctetString
2454                       values), do we need to print them too, on separate
2455                       lines
2456     :param with_colours: colourize output, if ``termcolor`` library
2457                          is available
2458     :param with_decode_path: print decode path
2459     :param decode_path_only: print only that specified decode path
2460     """
2461     def _pprint_pps(pps):
2462         for pp in pps:
2463             if hasattr(pp, "_fields"):
2464                 if (
2465                         decode_path_only != () and
2466                         tuple(
2467                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2468                         ) != decode_path_only
2469                 ):
2470                     continue
2471                 if big_blobs:
2472                     yield pp_console_row(
2473                         pp,
2474                         oid_maps=oid_maps,
2475                         with_offsets=True,
2476                         with_blob=False,
2477                         with_colours=with_colours,
2478                         with_decode_path=with_decode_path,
2479                         decode_path_len_decrease=len(decode_path_only),
2480                     )
2481                     for row in pp_console_blob(
2482                             pp,
2483                             decode_path_len_decrease=len(decode_path_only),
2484                     ):
2485                         yield row
2486                 else:
2487                     yield pp_console_row(
2488                         pp,
2489                         oid_maps=oid_maps,
2490                         with_offsets=True,
2491                         with_blob=True,
2492                         with_colours=with_colours,
2493                         with_decode_path=with_decode_path,
2494                         decode_path_len_decrease=len(decode_path_only),
2495                     )
2496             else:
2497                 for row in _pprint_pps(pp):
2498                     yield row
2499     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2500
2501
2502 ########################################################################
2503 # ASN.1 primitive types
2504 ########################################################################
2505
2506 BooleanState = namedtuple(
2507     "BooleanState",
2508     BasicState._fields + ("value",),
2509     **NAMEDTUPLE_KWARGS
2510 )
2511
2512
2513 class Boolean(Obj):
2514     """``BOOLEAN`` boolean type
2515
2516     >>> b = Boolean(True)
2517     BOOLEAN True
2518     >>> b == Boolean(True)
2519     True
2520     >>> bool(b)
2521     True
2522     """
2523     __slots__ = ()
2524     tag_default = tag_encode(1)
2525     asn1_type_name = "BOOLEAN"
2526
2527     def __init__(
2528             self,
2529             value=None,
2530             impl=None,
2531             expl=None,
2532             default=None,
2533             optional=False,
2534             _decoded=(0, 0, 0),
2535     ):
2536         """
2537         :param value: set the value. Either boolean type, or
2538                       :py:class:`pyderasn.Boolean` object
2539         :param bytes impl: override default tag with ``IMPLICIT`` one
2540         :param bytes expl: override default tag with ``EXPLICIT`` one
2541         :param default: set default value. Type same as in ``value``
2542         :param bool optional: is object ``OPTIONAL`` in sequence
2543         """
2544         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2545         self._value = None if value is None else self._value_sanitize(value)
2546         if default is not None:
2547             default = self._value_sanitize(default)
2548             self.default = self.__class__(
2549                 value=default,
2550                 impl=self.tag,
2551                 expl=self._expl,
2552             )
2553             if value is None:
2554                 self._value = default
2555
2556     def _value_sanitize(self, value):
2557         if value.__class__ == bool:
2558             return value
2559         if issubclass(value.__class__, Boolean):
2560             return value._value
2561         raise InvalidValueType((self.__class__, bool))
2562
2563     @property
2564     def ready(self):
2565         return self._value is not None
2566
2567     def __getstate__(self):
2568         return BooleanState(
2569             __version__,
2570             self.tag,
2571             self._tag_order,
2572             self._expl,
2573             self.default,
2574             self.optional,
2575             self.offset,
2576             self.llen,
2577             self.vlen,
2578             self.expl_lenindef,
2579             self.lenindef,
2580             self.ber_encoded,
2581             self._value,
2582         )
2583
2584     def __setstate__(self, state):
2585         super(Boolean, self).__setstate__(state)
2586         self._value = state.value
2587
2588     def __nonzero__(self):
2589         self._assert_ready()
2590         return self._value
2591
2592     def __bool__(self):
2593         self._assert_ready()
2594         return self._value
2595
2596     def __eq__(self, their):
2597         if their.__class__ == bool:
2598             return self._value == their
2599         if not issubclass(their.__class__, Boolean):
2600             return False
2601         return (
2602             self._value == their._value and
2603             self.tag == their.tag and
2604             self._expl == their._expl
2605         )
2606
2607     def __call__(
2608             self,
2609             value=None,
2610             impl=None,
2611             expl=None,
2612             default=None,
2613             optional=None,
2614     ):
2615         return self.__class__(
2616             value=value,
2617             impl=self.tag if impl is None else impl,
2618             expl=self._expl if expl is None else expl,
2619             default=self.default if default is None else default,
2620             optional=self.optional if optional is None else optional,
2621         )
2622
2623     def _encode(self):
2624         self._assert_ready()
2625         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2626
2627     def _encode1st(self, state):
2628         return len(self.tag) + 2, state
2629
2630     def _encode2nd(self, writer, state_iter):
2631         self._assert_ready()
2632         write_full(writer, self._encode())
2633
2634     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2635         try:
2636             t, _, lv = tag_strip(tlv)
2637         except DecodeError as err:
2638             raise err.__class__(
2639                 msg=err.msg,
2640                 klass=self.__class__,
2641                 decode_path=decode_path,
2642                 offset=offset,
2643             )
2644         if t != self.tag:
2645             raise TagMismatch(
2646                 klass=self.__class__,
2647                 decode_path=decode_path,
2648                 offset=offset,
2649             )
2650         if tag_only:
2651             yield None
2652             return
2653         try:
2654             l, _, v = len_decode(lv)
2655         except DecodeError as err:
2656             raise err.__class__(
2657                 msg=err.msg,
2658                 klass=self.__class__,
2659                 decode_path=decode_path,
2660                 offset=offset,
2661             )
2662         if l != 1:
2663             raise InvalidLength(
2664                 "Boolean's length must be equal to 1",
2665                 klass=self.__class__,
2666                 decode_path=decode_path,
2667                 offset=offset,
2668             )
2669         if l > len(v):
2670             raise NotEnoughData(
2671                 "encoded length is longer than data",
2672                 klass=self.__class__,
2673                 decode_path=decode_path,
2674                 offset=offset,
2675             )
2676         first_octet = byte2int(v)
2677         ber_encoded = False
2678         if first_octet == 0:
2679             value = False
2680         elif first_octet == 0xFF:
2681             value = True
2682         elif ctx.get("bered", False):
2683             value = True
2684             ber_encoded = True
2685         else:
2686             raise DecodeError(
2687                 "unacceptable Boolean value",
2688                 klass=self.__class__,
2689                 decode_path=decode_path,
2690                 offset=offset,
2691             )
2692         obj = self.__class__(
2693             value=value,
2694             impl=self.tag,
2695             expl=self._expl,
2696             default=self.default,
2697             optional=self.optional,
2698             _decoded=(offset, 1, 1),
2699         )
2700         obj.ber_encoded = ber_encoded
2701         yield decode_path, obj, v[1:]
2702
2703     def __repr__(self):
2704         return pp_console_row(next(self.pps()))
2705
2706     def pps(self, decode_path=()):
2707         yield _pp(
2708             obj=self,
2709             asn1_type_name=self.asn1_type_name,
2710             obj_name=self.__class__.__name__,
2711             decode_path=decode_path,
2712             value=str(self._value) if self.ready else None,
2713             optional=self.optional,
2714             default=self == self.default,
2715             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2716             expl=None if self._expl is None else tag_decode(self._expl),
2717             offset=self.offset,
2718             tlen=self.tlen,
2719             llen=self.llen,
2720             vlen=self.vlen,
2721             expl_offset=self.expl_offset if self.expled else None,
2722             expl_tlen=self.expl_tlen if self.expled else None,
2723             expl_llen=self.expl_llen if self.expled else None,
2724             expl_vlen=self.expl_vlen if self.expled else None,
2725             expl_lenindef=self.expl_lenindef,
2726             ber_encoded=self.ber_encoded,
2727             bered=self.bered,
2728         )
2729         for pp in self.pps_lenindef(decode_path):
2730             yield pp
2731
2732
2733 IntegerState = namedtuple(
2734     "IntegerState",
2735     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2736     **NAMEDTUPLE_KWARGS
2737 )
2738
2739
2740 class Integer(Obj):
2741     """``INTEGER`` integer type
2742
2743     >>> b = Integer(-123)
2744     INTEGER -123
2745     >>> b == Integer(-123)
2746     True
2747     >>> int(b)
2748     -123
2749
2750     >>> Integer(2, bounds=(1, 3))
2751     INTEGER 2
2752     >>> Integer(5, bounds=(1, 3))
2753     Traceback (most recent call last):
2754     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2755
2756     ::
2757
2758         class Version(Integer):
2759             schema = (
2760                 ("v1", 0),
2761                 ("v2", 1),
2762                 ("v3", 2),
2763             )
2764
2765     >>> v = Version("v1")
2766     Version INTEGER v1
2767     >>> int(v)
2768     0
2769     >>> v.named
2770     'v1'
2771     >>> v.specs
2772     {'v3': 2, 'v1': 0, 'v2': 1}
2773     """
2774     __slots__ = ("specs", "_bound_min", "_bound_max")
2775     tag_default = tag_encode(2)
2776     asn1_type_name = "INTEGER"
2777
2778     def __init__(
2779             self,
2780             value=None,
2781             bounds=None,
2782             impl=None,
2783             expl=None,
2784             default=None,
2785             optional=False,
2786             _specs=None,
2787             _decoded=(0, 0, 0),
2788     ):
2789         """
2790         :param value: set the value. Either integer type, named value
2791                       (if ``schema`` is specified in the class), or
2792                       :py:class:`pyderasn.Integer` object
2793         :param bounds: set ``(MIN, MAX)`` value constraint.
2794                        (-inf, +inf) by default
2795         :param bytes impl: override default tag with ``IMPLICIT`` one
2796         :param bytes expl: override default tag with ``EXPLICIT`` one
2797         :param default: set default value. Type same as in ``value``
2798         :param bool optional: is object ``OPTIONAL`` in sequence
2799         """
2800         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2801         self._value = value
2802         specs = getattr(self, "schema", {}) if _specs is None else _specs
2803         self.specs = specs if specs.__class__ == dict else dict(specs)
2804         self._bound_min, self._bound_max = getattr(
2805             self,
2806             "bounds",
2807             (float("-inf"), float("+inf")),
2808         ) if bounds is None else bounds
2809         if value is not None:
2810             self._value = self._value_sanitize(value)
2811         if default is not None:
2812             default = self._value_sanitize(default)
2813             self.default = self.__class__(
2814                 value=default,
2815                 impl=self.tag,
2816                 expl=self._expl,
2817                 _specs=self.specs,
2818             )
2819             if self._value is None:
2820                 self._value = default
2821
2822     def _value_sanitize(self, value):
2823         if isinstance(value, integer_types):
2824             pass
2825         elif issubclass(value.__class__, Integer):
2826             value = value._value
2827         elif value.__class__ == str:
2828             value = self.specs.get(value)
2829             if value is None:
2830                 raise ObjUnknown("integer value: %s" % value)
2831         else:
2832             raise InvalidValueType((self.__class__, int, str))
2833         if not self._bound_min <= value <= self._bound_max:
2834             raise BoundsError(self._bound_min, value, self._bound_max)
2835         return value
2836
2837     @property
2838     def ready(self):
2839         return self._value is not None
2840
2841     def __getstate__(self):
2842         return IntegerState(
2843             __version__,
2844             self.tag,
2845             self._tag_order,
2846             self._expl,
2847             self.default,
2848             self.optional,
2849             self.offset,
2850             self.llen,
2851             self.vlen,
2852             self.expl_lenindef,
2853             self.lenindef,
2854             self.ber_encoded,
2855             self.specs,
2856             self._value,
2857             self._bound_min,
2858             self._bound_max,
2859         )
2860
2861     def __setstate__(self, state):
2862         super(Integer, self).__setstate__(state)
2863         self.specs = state.specs
2864         self._value = state.value
2865         self._bound_min = state.bound_min
2866         self._bound_max = state.bound_max
2867
2868     def __int__(self):
2869         self._assert_ready()
2870         return int(self._value)
2871
2872     def tohex(self):
2873         """Hexadecimal representation
2874
2875         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2876         """
2877         hex_repr = hex(int(self))[2:].upper()
2878         if len(hex_repr) % 2 != 0:
2879             hex_repr = "0" + hex_repr
2880         return hex_repr
2881
2882     def __hash__(self):
2883         self._assert_ready()
2884         return hash(b"".join((
2885             self.tag,
2886             bytes(self._expl or b""),
2887             str(self._value).encode("ascii"),
2888         )))
2889
2890     def __eq__(self, their):
2891         if isinstance(their, integer_types):
2892             return self._value == their
2893         if not issubclass(their.__class__, Integer):
2894             return False
2895         return (
2896             self._value == their._value and
2897             self.tag == their.tag and
2898             self._expl == their._expl
2899         )
2900
2901     def __lt__(self, their):
2902         return self._value < their._value
2903
2904     @property
2905     def named(self):
2906         """Return named representation (if exists) of the value
2907         """
2908         for name, value in iteritems(self.specs):
2909             if value == self._value:
2910                 return name
2911         return None
2912
2913     def __call__(
2914             self,
2915             value=None,
2916             bounds=None,
2917             impl=None,
2918             expl=None,
2919             default=None,
2920             optional=None,
2921     ):
2922         return self.__class__(
2923             value=value,
2924             bounds=(
2925                 (self._bound_min, self._bound_max)
2926                 if bounds is None else bounds
2927             ),
2928             impl=self.tag if impl is None else impl,
2929             expl=self._expl if expl is None else expl,
2930             default=self.default if default is None else default,
2931             optional=self.optional if optional is None else optional,
2932             _specs=self.specs,
2933         )
2934
2935     def _encode_payload(self):
2936         self._assert_ready()
2937         value = self._value
2938         if PY2:
2939             if value == 0:
2940                 octets = bytearray([0])
2941             elif value < 0:
2942                 value = -value
2943                 value -= 1
2944                 octets = bytearray()
2945                 while value > 0:
2946                     octets.append((value & 0xFF) ^ 0xFF)
2947                     value >>= 8
2948                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2949                     octets.append(0xFF)
2950             else:
2951                 octets = bytearray()
2952                 while value > 0:
2953                     octets.append(value & 0xFF)
2954                     value >>= 8
2955                 if octets[-1] & 0x80 > 0:
2956                     octets.append(0x00)
2957             octets.reverse()
2958             octets = bytes(octets)
2959         else:
2960             bytes_len = ceil(value.bit_length() / 8) or 1
2961             while True:
2962                 try:
2963                     octets = value.to_bytes(
2964                         bytes_len,
2965                         byteorder="big",
2966                         signed=True,
2967                     )
2968                 except OverflowError:
2969                     bytes_len += 1
2970                 else:
2971                     break
2972         return octets
2973
2974     def _encode(self):
2975         octets = self._encode_payload()
2976         return b"".join((self.tag, len_encode(len(octets)), octets))
2977
2978     def _encode1st(self, state):
2979         l = len(self._encode_payload())
2980         return len(self.tag) + len_size(l) + l, state
2981
2982     def _encode2nd(self, writer, state_iter):
2983         write_full(writer, self._encode())
2984
2985     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2986         try:
2987             t, _, lv = tag_strip(tlv)
2988         except DecodeError as err:
2989             raise err.__class__(
2990                 msg=err.msg,
2991                 klass=self.__class__,
2992                 decode_path=decode_path,
2993                 offset=offset,
2994             )
2995         if t != self.tag:
2996             raise TagMismatch(
2997                 klass=self.__class__,
2998                 decode_path=decode_path,
2999                 offset=offset,
3000             )
3001         if tag_only:
3002             yield None
3003             return
3004         try:
3005             l, llen, v = len_decode(lv)
3006         except DecodeError as err:
3007             raise err.__class__(
3008                 msg=err.msg,
3009                 klass=self.__class__,
3010                 decode_path=decode_path,
3011                 offset=offset,
3012             )
3013         if l > len(v):
3014             raise NotEnoughData(
3015                 "encoded length is longer than data",
3016                 klass=self.__class__,
3017                 decode_path=decode_path,
3018                 offset=offset,
3019             )
3020         if l == 0:
3021             raise NotEnoughData(
3022                 "zero length",
3023                 klass=self.__class__,
3024                 decode_path=decode_path,
3025                 offset=offset,
3026             )
3027         v, tail = v[:l], v[l:]
3028         first_octet = byte2int(v)
3029         if l > 1:
3030             second_octet = byte2int(v[1:])
3031             if (
3032                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3033                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3034             ):
3035                 raise DecodeError(
3036                     "non normalized integer",
3037                     klass=self.__class__,
3038                     decode_path=decode_path,
3039                     offset=offset,
3040                 )
3041         if PY2:
3042             value = 0
3043             if first_octet & 0x80 > 0:
3044                 octets = bytearray()
3045                 for octet in bytearray(v):
3046                     octets.append(octet ^ 0xFF)
3047                 for octet in octets:
3048                     value = (value << 8) | octet
3049                 value += 1
3050                 value = -value
3051             else:
3052                 for octet in bytearray(v):
3053                     value = (value << 8) | octet
3054         else:
3055             value = int.from_bytes(v, byteorder="big", signed=True)
3056         try:
3057             obj = self.__class__(
3058                 value=value,
3059                 bounds=(self._bound_min, self._bound_max),
3060                 impl=self.tag,
3061                 expl=self._expl,
3062                 default=self.default,
3063                 optional=self.optional,
3064                 _specs=self.specs,
3065                 _decoded=(offset, llen, l),
3066             )
3067         except BoundsError as err:
3068             raise DecodeError(
3069                 msg=str(err),
3070                 klass=self.__class__,
3071                 decode_path=decode_path,
3072                 offset=offset,
3073             )
3074         yield decode_path, obj, tail
3075
3076     def __repr__(self):
3077         return pp_console_row(next(self.pps()))
3078
3079     def pps(self, decode_path=()):
3080         yield _pp(
3081             obj=self,
3082             asn1_type_name=self.asn1_type_name,
3083             obj_name=self.__class__.__name__,
3084             decode_path=decode_path,
3085             value=(self.named or str(self._value)) if self.ready else None,
3086             optional=self.optional,
3087             default=self == self.default,
3088             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3089             expl=None if self._expl is None else tag_decode(self._expl),
3090             offset=self.offset,
3091             tlen=self.tlen,
3092             llen=self.llen,
3093             vlen=self.vlen,
3094             expl_offset=self.expl_offset if self.expled else None,
3095             expl_tlen=self.expl_tlen if self.expled else None,
3096             expl_llen=self.expl_llen if self.expled else None,
3097             expl_vlen=self.expl_vlen if self.expled else None,
3098             expl_lenindef=self.expl_lenindef,
3099             bered=self.bered,
3100         )
3101         for pp in self.pps_lenindef(decode_path):
3102             yield pp
3103
3104
3105 BitStringState = namedtuple(
3106     "BitStringState",
3107     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3108     **NAMEDTUPLE_KWARGS
3109 )
3110
3111
3112 class BitString(Obj):
3113     """``BIT STRING`` bit string type
3114
3115     >>> BitString(b"hello world")
3116     BIT STRING 88 bits 68656c6c6f20776f726c64
3117     >>> bytes(b)
3118     b'hello world'
3119     >>> b == b"hello world"
3120     True
3121     >>> b.bit_len
3122     88
3123
3124     >>> BitString("'0A3B5F291CD'H")
3125     BIT STRING 44 bits 0a3b5f291cd0
3126     >>> b = BitString("'010110000000'B")
3127     BIT STRING 12 bits 5800
3128     >>> b.bit_len
3129     12
3130     >>> b[0], b[1], b[2], b[3]
3131     (False, True, False, True)
3132     >>> b[1000]
3133     False
3134     >>> [v for v in b]
3135     [False, True, False, True, True, False, False, False, False, False, False, False]
3136
3137     ::
3138
3139         class KeyUsage(BitString):
3140             schema = (
3141                 ("digitalSignature", 0),
3142                 ("nonRepudiation", 1),
3143                 ("keyEncipherment", 2),
3144             )
3145
3146     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3147     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3148     >>> b.named
3149     ['nonRepudiation', 'keyEncipherment']
3150     >>> b.specs
3151     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3152
3153     .. note::
3154
3155        Pay attention that BIT STRING can be encoded both in primitive
3156        and constructed forms. Decoder always checks constructed form tag
3157        additionally to specified primitive one. If BER decoding is
3158        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3159        of DER restrictions.
3160     """
3161     __slots__ = ("tag_constructed", "specs", "defined")
3162     tag_default = tag_encode(3)
3163     asn1_type_name = "BIT STRING"
3164
3165     def __init__(
3166             self,
3167             value=None,
3168             impl=None,
3169             expl=None,
3170             default=None,
3171             optional=False,
3172             _specs=None,
3173             _decoded=(0, 0, 0),
3174     ):
3175         """
3176         :param value: set the value. Either binary type, tuple of named
3177                       values (if ``schema`` is specified in the class),
3178                       string in ``'XXX...'B`` form, or
3179                       :py:class:`pyderasn.BitString` object
3180         :param bytes impl: override default tag with ``IMPLICIT`` one
3181         :param bytes expl: override default tag with ``EXPLICIT`` one
3182         :param default: set default value. Type same as in ``value``
3183         :param bool optional: is object ``OPTIONAL`` in sequence
3184         """
3185         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3186         specs = getattr(self, "schema", {}) if _specs is None else _specs
3187         self.specs = specs if specs.__class__ == dict else dict(specs)
3188         self._value = None if value is None else self._value_sanitize(value)
3189         if default is not None:
3190             default = self._value_sanitize(default)
3191             self.default = self.__class__(
3192                 value=default,
3193                 impl=self.tag,
3194                 expl=self._expl,
3195             )
3196             if value is None:
3197                 self._value = default
3198         self.defined = None
3199         tag_klass, _, tag_num = tag_decode(self.tag)
3200         self.tag_constructed = tag_encode(
3201             klass=tag_klass,
3202             form=TagFormConstructed,
3203             num=tag_num,
3204         )
3205
3206     def _bits2octets(self, bits):
3207         if len(self.specs) > 0:
3208             bits = bits.rstrip("0")
3209         bit_len = len(bits)
3210         bits += "0" * ((8 - (bit_len % 8)) % 8)
3211         octets = bytearray(len(bits) // 8)
3212         for i in six_xrange(len(octets)):
3213             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3214         return bit_len, bytes(octets)
3215
3216     def _value_sanitize(self, value):
3217         if isinstance(value, (string_types, binary_type)):
3218             if (
3219                     isinstance(value, string_types) and
3220                     value.startswith("'")
3221             ):
3222                 if value.endswith("'B"):
3223                     value = value[1:-2]
3224                     if not frozenset(value) <= SET01:
3225                         raise ValueError("B's coding contains unacceptable chars")
3226                     return self._bits2octets(value)
3227                 if value.endswith("'H"):
3228                     value = value[1:-2]
3229                     return (
3230                         len(value) * 4,
3231                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3232                     )
3233             if value.__class__ == binary_type:
3234                 return (len(value) * 8, value)
3235             raise InvalidValueType((self.__class__, string_types, binary_type))
3236         if value.__class__ == tuple:
3237             if (
3238                     len(value) == 2 and
3239                     isinstance(value[0], integer_types) and
3240                     value[1].__class__ == binary_type
3241             ):
3242                 return value
3243             bits = []
3244             for name in value:
3245                 bit = self.specs.get(name)
3246                 if bit is None:
3247                     raise ObjUnknown("BitString value: %s" % name)
3248                 bits.append(bit)
3249             if len(bits) == 0:
3250                 return self._bits2octets("")
3251             bits = frozenset(bits)
3252             return self._bits2octets("".join(
3253                 ("1" if bit in bits else "0")
3254                 for bit in six_xrange(max(bits) + 1)
3255             ))
3256         if issubclass(value.__class__, BitString):
3257             return value._value
3258         raise InvalidValueType((self.__class__, binary_type, string_types))
3259
3260     @property
3261     def ready(self):
3262         return self._value is not None
3263
3264     def __getstate__(self):
3265         return BitStringState(
3266             __version__,
3267             self.tag,
3268             self._tag_order,
3269             self._expl,
3270             self.default,
3271             self.optional,
3272             self.offset,
3273             self.llen,
3274             self.vlen,
3275             self.expl_lenindef,
3276             self.lenindef,
3277             self.ber_encoded,
3278             self.specs,
3279             self._value,
3280             self.tag_constructed,
3281             self.defined,
3282         )
3283
3284     def __setstate__(self, state):
3285         super(BitString, self).__setstate__(state)
3286         self.specs = state.specs
3287         self._value = state.value
3288         self.tag_constructed = state.tag_constructed
3289         self.defined = state.defined
3290
3291     def __iter__(self):
3292         self._assert_ready()
3293         for i in six_xrange(self._value[0]):
3294             yield self[i]
3295
3296     @property
3297     def bit_len(self):
3298         """Returns number of bits in the string
3299         """
3300         self._assert_ready()
3301         return self._value[0]
3302
3303     def __bytes__(self):
3304         self._assert_ready()
3305         return self._value[1]
3306
3307     def __eq__(self, their):
3308         if their.__class__ == bytes:
3309             return self._value[1] == their
3310         if not issubclass(their.__class__, BitString):
3311             return False
3312         return (
3313             self._value == their._value and
3314             self.tag == their.tag and
3315             self._expl == their._expl
3316         )
3317
3318     @property
3319     def named(self):
3320         """Named representation (if exists) of the bits
3321
3322         :returns: [str(name), ...]
3323         """
3324         return [name for name, bit in iteritems(self.specs) if self[bit]]
3325
3326     def __call__(
3327             self,
3328             value=None,
3329             impl=None,
3330             expl=None,
3331             default=None,
3332             optional=None,
3333     ):
3334         return self.__class__(
3335             value=value,
3336             impl=self.tag if impl is None else impl,
3337             expl=self._expl if expl is None else expl,
3338             default=self.default if default is None else default,
3339             optional=self.optional if optional is None else optional,
3340             _specs=self.specs,
3341         )
3342
3343     def __getitem__(self, key):
3344         if key.__class__ == int:
3345             bit_len, octets = self._value
3346             if key >= bit_len:
3347                 return False
3348             return (
3349                 byte2int(memoryview(octets)[key // 8:]) >>
3350                 (7 - (key % 8))
3351             ) & 1 == 1
3352         if isinstance(key, string_types):
3353             value = self.specs.get(key)
3354             if value is None:
3355                 raise ObjUnknown("BitString value: %s" % key)
3356             return self[value]
3357         raise InvalidValueType((int, str))
3358
3359     def _encode(self):
3360         self._assert_ready()
3361         bit_len, octets = self._value
3362         return b"".join((
3363             self.tag,
3364             len_encode(len(octets) + 1),
3365             int2byte((8 - bit_len % 8) % 8),
3366             octets,
3367         ))
3368
3369     def _encode1st(self, state):
3370         self._assert_ready()
3371         _, octets = self._value
3372         l = len(octets) + 1
3373         return len(self.tag) + len_size(l) + l, state
3374
3375     def _encode2nd(self, writer, state_iter):
3376         bit_len, octets = self._value
3377         write_full(writer, b"".join((
3378             self.tag,
3379             len_encode(len(octets) + 1),
3380             int2byte((8 - bit_len % 8) % 8),
3381         )))
3382         write_full(writer, octets)
3383
3384     def _encode_cer(self, writer):
3385         bit_len, octets = self._value
3386         if len(octets) + 1 <= 1000:
3387             write_full(writer, self._encode())
3388             return
3389         write_full(writer, self.tag_constructed)
3390         write_full(writer, LENINDEF)
3391         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3392             write_full(writer, b"".join((
3393                 BitString.tag_default,
3394                 LEN1K,
3395                 int2byte(0),
3396                 octets[offset:offset + 999],
3397             )))
3398         tail = octets[offset + 999:]
3399         if len(tail) > 0:
3400             tail = int2byte((8 - bit_len % 8) % 8) + tail
3401             write_full(writer, b"".join((
3402                 BitString.tag_default,
3403                 len_encode(len(tail)),
3404                 tail,
3405             )))
3406         write_full(writer, EOC)
3407
3408     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3409         try:
3410             t, tlen, lv = tag_strip(tlv)
3411         except DecodeError as err:
3412             raise err.__class__(
3413                 msg=err.msg,
3414                 klass=self.__class__,
3415                 decode_path=decode_path,
3416                 offset=offset,
3417             )
3418         if t == self.tag:
3419             if tag_only:  # pragma: no cover
3420                 yield None
3421                 return
3422             try:
3423                 l, llen, v = len_decode(lv)
3424             except DecodeError as err:
3425                 raise err.__class__(
3426                     msg=err.msg,
3427                     klass=self.__class__,
3428                     decode_path=decode_path,
3429                     offset=offset,
3430                 )
3431             if l > len(v):
3432                 raise NotEnoughData(
3433                     "encoded length is longer than data",
3434                     klass=self.__class__,
3435                     decode_path=decode_path,
3436                     offset=offset,
3437                 )
3438             if l == 0:
3439                 raise NotEnoughData(
3440                     "zero length",
3441                     klass=self.__class__,
3442                     decode_path=decode_path,
3443                     offset=offset,
3444                 )
3445             pad_size = byte2int(v)
3446             if l == 1 and pad_size != 0:
3447                 raise DecodeError(
3448                     "invalid empty value",
3449                     klass=self.__class__,
3450                     decode_path=decode_path,
3451                     offset=offset,
3452                 )
3453             if pad_size > 7:
3454                 raise DecodeError(
3455                     "too big pad",
3456                     klass=self.__class__,
3457                     decode_path=decode_path,
3458                     offset=offset,
3459                 )
3460             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3461                 raise DecodeError(
3462                     "invalid pad",
3463                     klass=self.__class__,
3464                     decode_path=decode_path,
3465                     offset=offset,
3466                 )
3467             v, tail = v[:l], v[l:]
3468             bit_len = (len(v) - 1) * 8 - pad_size
3469             obj = self.__class__(
3470                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3471                 impl=self.tag,
3472                 expl=self._expl,
3473                 default=self.default,
3474                 optional=self.optional,
3475                 _specs=self.specs,
3476                 _decoded=(offset, llen, l),
3477             )
3478             if evgen_mode:
3479                 obj._value = (bit_len, None)
3480             yield decode_path, obj, tail
3481             return
3482         if t != self.tag_constructed:
3483             raise TagMismatch(
3484                 klass=self.__class__,
3485                 decode_path=decode_path,
3486                 offset=offset,
3487             )
3488         if not ctx.get("bered", False):
3489             raise DecodeError(
3490                 "unallowed BER constructed encoding",
3491                 klass=self.__class__,
3492                 decode_path=decode_path,
3493                 offset=offset,
3494             )
3495         if tag_only:  # pragma: no cover
3496             yield None
3497             return
3498         lenindef = False
3499         try:
3500             l, llen, v = len_decode(lv)
3501         except LenIndefForm:
3502             llen, l, v = 1, 0, lv[1:]
3503             lenindef = True
3504         except DecodeError as err:
3505             raise err.__class__(
3506                 msg=err.msg,
3507                 klass=self.__class__,
3508                 decode_path=decode_path,
3509                 offset=offset,
3510             )
3511         if l > len(v):
3512             raise NotEnoughData(
3513                 "encoded length is longer than data",
3514                 klass=self.__class__,
3515                 decode_path=decode_path,
3516                 offset=offset,
3517             )
3518         if not lenindef and l == 0:
3519             raise NotEnoughData(
3520                 "zero length",
3521                 klass=self.__class__,
3522                 decode_path=decode_path,
3523                 offset=offset,
3524             )
3525         chunks = []
3526         sub_offset = offset + tlen + llen
3527         vlen = 0
3528         while True:
3529             if lenindef:
3530                 if v[:EOC_LEN].tobytes() == EOC:
3531                     break
3532             else:
3533                 if vlen == l:
3534                     break
3535                 if vlen > l:
3536                     raise DecodeError(
3537                         "chunk out of bounds",
3538                         klass=self.__class__,
3539                         decode_path=decode_path + (str(len(chunks) - 1),),
3540                         offset=chunks[-1].offset,
3541                     )
3542             sub_decode_path = decode_path + (str(len(chunks)),)
3543             try:
3544                 if evgen_mode:
3545                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3546                             v,
3547                             offset=sub_offset,
3548                             decode_path=sub_decode_path,
3549                             leavemm=True,
3550                             ctx=ctx,
3551                             _ctx_immutable=False,
3552                     ):
3553                         yield _decode_path, chunk, v_tail
3554                 else:
3555                     _, chunk, v_tail = next(BitString().decode_evgen(
3556                         v,
3557                         offset=sub_offset,
3558                         decode_path=sub_decode_path,
3559                         leavemm=True,
3560                         ctx=ctx,
3561                         _ctx_immutable=False,
3562                         _evgen_mode=False,
3563                     ))
3564             except TagMismatch:
3565                 raise DecodeError(
3566                     "expected BitString encoded chunk",
3567                     klass=self.__class__,
3568                     decode_path=sub_decode_path,
3569                     offset=sub_offset,
3570                 )
3571             chunks.append(chunk)
3572             sub_offset += chunk.tlvlen
3573             vlen += chunk.tlvlen
3574             v = v_tail
3575         if len(chunks) == 0:
3576             raise DecodeError(
3577                 "no chunks",
3578                 klass=self.__class__,
3579                 decode_path=decode_path,
3580                 offset=offset,
3581             )
3582         values = []
3583         bit_len = 0
3584         for chunk_i, chunk in enumerate(chunks[:-1]):
3585             if chunk.bit_len % 8 != 0:
3586                 raise DecodeError(
3587                     "BitString chunk is not multiple of 8 bits",
3588                     klass=self.__class__,
3589                     decode_path=decode_path + (str(chunk_i),),
3590                     offset=chunk.offset,
3591                 )
3592             if not evgen_mode:
3593                 values.append(bytes(chunk))
3594             bit_len += chunk.bit_len
3595         chunk_last = chunks[-1]
3596         if not evgen_mode:
3597             values.append(bytes(chunk_last))
3598         bit_len += chunk_last.bit_len
3599         obj = self.__class__(
3600             value=None if evgen_mode else (bit_len, b"".join(values)),
3601             impl=self.tag,
3602             expl=self._expl,
3603             default=self.default,
3604             optional=self.optional,
3605             _specs=self.specs,
3606             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3607         )
3608         if evgen_mode:
3609             obj._value = (bit_len, None)
3610         obj.lenindef = lenindef
3611         obj.ber_encoded = True
3612         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3613
3614     def __repr__(self):
3615         return pp_console_row(next(self.pps()))
3616
3617     def pps(self, decode_path=()):
3618         value = None
3619         blob = None
3620         if self.ready:
3621             bit_len, blob = self._value
3622             value = "%d bits" % bit_len
3623             if len(self.specs) > 0 and blob is not None:
3624                 blob = tuple(self.named)
3625         yield _pp(
3626             obj=self,
3627             asn1_type_name=self.asn1_type_name,
3628             obj_name=self.__class__.__name__,
3629             decode_path=decode_path,
3630             value=value,
3631             blob=blob,
3632             optional=self.optional,
3633             default=self == self.default,
3634             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3635             expl=None if self._expl is None else tag_decode(self._expl),
3636             offset=self.offset,
3637             tlen=self.tlen,
3638             llen=self.llen,
3639             vlen=self.vlen,
3640             expl_offset=self.expl_offset if self.expled else None,
3641             expl_tlen=self.expl_tlen if self.expled else None,
3642             expl_llen=self.expl_llen if self.expled else None,
3643             expl_vlen=self.expl_vlen if self.expled else None,
3644             expl_lenindef=self.expl_lenindef,
3645             lenindef=self.lenindef,
3646             ber_encoded=self.ber_encoded,
3647             bered=self.bered,
3648         )
3649         defined_by, defined = self.defined or (None, None)
3650         if defined_by is not None:
3651             yield defined.pps(
3652                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3653             )
3654         for pp in self.pps_lenindef(decode_path):
3655             yield pp
3656
3657
3658 OctetStringState = namedtuple(
3659     "OctetStringState",
3660     BasicState._fields + (
3661         "value",
3662         "bound_min",
3663         "bound_max",
3664         "tag_constructed",
3665         "defined",
3666     ),
3667     **NAMEDTUPLE_KWARGS
3668 )
3669
3670
3671 class OctetString(Obj):
3672     """``OCTET STRING`` binary string type
3673
3674     >>> s = OctetString(b"hello world")
3675     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3676     >>> s == OctetString(b"hello world")
3677     True
3678     >>> bytes(s)
3679     b'hello world'
3680
3681     >>> OctetString(b"hello", bounds=(4, 4))
3682     Traceback (most recent call last):
3683     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3684     >>> OctetString(b"hell", bounds=(4, 4))
3685     OCTET STRING 4 bytes 68656c6c
3686
3687     Memoryviews can be used as a values. If memoryview is made on
3688     mmap-ed file, then it does not take storage inside OctetString
3689     itself. In CER encoding mode it will be streamed to the specified
3690     writer, copying 1 KB chunks.
3691     """
3692     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3693     tag_default = tag_encode(4)
3694     asn1_type_name = "OCTET STRING"
3695     evgen_mode_skip_value = True
3696
3697     def __init__(
3698             self,
3699             value=None,
3700             bounds=None,
3701             impl=None,
3702             expl=None,
3703             default=None,
3704             optional=False,
3705             _decoded=(0, 0, 0),
3706             ctx=None,
3707     ):
3708         """
3709         :param value: set the value. Either binary type, or
3710                       :py:class:`pyderasn.OctetString` object
3711         :param bounds: set ``(MIN, MAX)`` value size constraint.
3712                        (-inf, +inf) by default
3713         :param bytes impl: override default tag with ``IMPLICIT`` one
3714         :param bytes expl: override default tag with ``EXPLICIT`` one
3715         :param default: set default value. Type same as in ``value``
3716         :param bool optional: is object ``OPTIONAL`` in sequence
3717         """
3718         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3719         self._value = value
3720         self._bound_min, self._bound_max = getattr(
3721             self,
3722             "bounds",
3723             (0, float("+inf")),
3724         ) if bounds is None else bounds
3725         if value is not None:
3726             self._value = self._value_sanitize(value)
3727         if default is not None:
3728             default = self._value_sanitize(default)
3729             self.default = self.__class__(
3730                 value=default,
3731                 impl=self.tag,
3732                 expl=self._expl,
3733             )
3734             if self._value is None:
3735                 self._value = default
3736         self.defined = None
3737         tag_klass, _, tag_num = tag_decode(self.tag)
3738         self.tag_constructed = tag_encode(
3739             klass=tag_klass,
3740             form=TagFormConstructed,
3741             num=tag_num,
3742         )
3743
3744     def _value_sanitize(self, value):
3745         if value.__class__ == binary_type or value.__class__ == memoryview:
3746             pass
3747         elif issubclass(value.__class__, OctetString):
3748             value = value._value
3749         else:
3750             raise InvalidValueType((self.__class__, bytes, memoryview))
3751         if not self._bound_min <= len(value) <= self._bound_max:
3752             raise BoundsError(self._bound_min, len(value), self._bound_max)
3753         return value
3754
3755     @property
3756     def ready(self):
3757         return self._value is not None
3758
3759     def __getstate__(self):
3760         return OctetStringState(
3761             __version__,
3762             self.tag,
3763             self._tag_order,
3764             self._expl,
3765             self.default,
3766             self.optional,
3767             self.offset,
3768             self.llen,
3769             self.vlen,
3770             self.expl_lenindef,
3771             self.lenindef,
3772             self.ber_encoded,
3773             self._value,
3774             self._bound_min,
3775             self._bound_max,
3776             self.tag_constructed,
3777             self.defined,
3778         )
3779
3780     def __setstate__(self, state):
3781         super(OctetString, self).__setstate__(state)
3782         self._value = state.value
3783         self._bound_min = state.bound_min
3784         self._bound_max = state.bound_max
3785         self.tag_constructed = state.tag_constructed
3786         self.defined = state.defined
3787
3788     def __bytes__(self):
3789         self._assert_ready()
3790         return bytes(self._value)
3791
3792     def __eq__(self, their):
3793         if their.__class__ == binary_type:
3794             return self._value == their
3795         if not issubclass(their.__class__, OctetString):
3796             return False
3797         return (
3798             self._value == their._value and
3799             self.tag == their.tag and
3800             self._expl == their._expl
3801         )
3802
3803     def __lt__(self, their):
3804         return self._value < their._value
3805
3806     def __call__(
3807             self,
3808             value=None,
3809             bounds=None,
3810             impl=None,
3811             expl=None,
3812             default=None,
3813             optional=None,
3814     ):
3815         return self.__class__(
3816             value=value,
3817             bounds=(
3818                 (self._bound_min, self._bound_max)
3819                 if bounds is None else bounds
3820             ),
3821             impl=self.tag if impl is None else impl,
3822             expl=self._expl if expl is None else expl,
3823             default=self.default if default is None else default,
3824             optional=self.optional if optional is None else optional,
3825         )
3826
3827     def _encode(self):
3828         self._assert_ready()
3829         return b"".join((
3830             self.tag,
3831             len_encode(len(self._value)),
3832             self._value,
3833         ))
3834
3835     def _encode1st(self, state):
3836         self._assert_ready()
3837         l = len(self._value)
3838         return len(self.tag) + len_size(l) + l, state
3839
3840     def _encode2nd(self, writer, state_iter):
3841         value = self._value
3842         write_full(writer, self.tag + len_encode(len(value)))
3843         write_full(writer, value)
3844
3845     def _encode_cer(self, writer):
3846         octets = self._value
3847         if len(octets) <= 1000:
3848             write_full(writer, self._encode())
3849             return
3850         write_full(writer, self.tag_constructed)
3851         write_full(writer, LENINDEF)
3852         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3853             write_full(writer, b"".join((
3854                 OctetString.tag_default,
3855                 LEN1K,
3856                 octets[offset:offset + 1000],
3857             )))
3858         tail = octets[offset + 1000:]
3859         if len(tail) > 0:
3860             write_full(writer, b"".join((
3861                 OctetString.tag_default,
3862                 len_encode(len(tail)),
3863                 tail,
3864             )))
3865         write_full(writer, EOC)
3866
3867     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3868         try:
3869             t, tlen, lv = tag_strip(tlv)
3870         except DecodeError as err:
3871             raise err.__class__(
3872                 msg=err.msg,
3873                 klass=self.__class__,
3874                 decode_path=decode_path,
3875                 offset=offset,
3876             )
3877         if t == self.tag:
3878             if tag_only:
3879                 yield None
3880                 return
3881             try:
3882                 l, llen, v = len_decode(lv)
3883             except DecodeError as err:
3884                 raise err.__class__(
3885                     msg=err.msg,
3886                     klass=self.__class__,
3887                     decode_path=decode_path,
3888                     offset=offset,
3889                 )
3890             if l > len(v):
3891                 raise NotEnoughData(
3892                     "encoded length is longer than data",
3893                     klass=self.__class__,
3894                     decode_path=decode_path,
3895                     offset=offset,
3896                 )
3897             v, tail = v[:l], v[l:]
3898             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3899                 raise DecodeError(
3900                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3901                     klass=self.__class__,
3902                     decode_path=decode_path,
3903                     offset=offset,
3904                 )
3905             try:
3906                 obj = self.__class__(
3907                     value=(
3908                         None if (evgen_mode and self.evgen_mode_skip_value)
3909                         else v.tobytes()
3910                     ),
3911                     bounds=(self._bound_min, self._bound_max),
3912                     impl=self.tag,
3913                     expl=self._expl,
3914                     default=self.default,
3915                     optional=self.optional,
3916                     _decoded=(offset, llen, l),
3917                     ctx=ctx,
3918                 )
3919             except DecodeError as err:
3920                 raise DecodeError(
3921                     msg=err.msg,
3922                     klass=self.__class__,
3923                     decode_path=decode_path,
3924                     offset=offset,
3925                 )
3926             except BoundsError as err:
3927                 raise DecodeError(
3928                     msg=str(err),
3929                     klass=self.__class__,
3930                     decode_path=decode_path,
3931                     offset=offset,
3932                 )
3933             yield decode_path, obj, tail
3934             return
3935         if t != self.tag_constructed:
3936             raise TagMismatch(
3937                 klass=self.__class__,
3938                 decode_path=decode_path,
3939                 offset=offset,
3940             )
3941         if not ctx.get("bered", False):
3942             raise DecodeError(
3943                 "unallowed BER constructed encoding",
3944                 klass=self.__class__,
3945                 decode_path=decode_path,
3946                 offset=offset,
3947             )
3948         if tag_only:
3949             yield None
3950             return
3951         lenindef = False
3952         try:
3953             l, llen, v = len_decode(lv)
3954         except LenIndefForm:
3955             llen, l, v = 1, 0, lv[1:]
3956             lenindef = True
3957         except DecodeError as err:
3958             raise err.__class__(
3959                 msg=err.msg,
3960                 klass=self.__class__,
3961                 decode_path=decode_path,
3962                 offset=offset,
3963             )
3964         if l > len(v):
3965             raise NotEnoughData(
3966                 "encoded length is longer than data",
3967                 klass=self.__class__,
3968                 decode_path=decode_path,
3969                 offset=offset,
3970             )
3971         chunks = []
3972         chunks_count = 0
3973         sub_offset = offset + tlen + llen
3974         vlen = 0
3975         payload_len = 0
3976         while True:
3977             if lenindef:
3978                 if v[:EOC_LEN].tobytes() == EOC:
3979                     break
3980             else:
3981                 if vlen == l:
3982                     break
3983                 if vlen > l:
3984                     raise DecodeError(
3985                         "chunk out of bounds",
3986                         klass=self.__class__,
3987                         decode_path=decode_path + (str(len(chunks) - 1),),
3988                         offset=chunks[-1].offset,
3989                     )
3990             try:
3991                 if evgen_mode:
3992                     sub_decode_path = decode_path + (str(chunks_count),)
3993                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3994                             v,
3995                             offset=sub_offset,
3996                             decode_path=sub_decode_path,
3997                             leavemm=True,
3998                             ctx=ctx,
3999                             _ctx_immutable=False,
4000                     ):
4001                         yield _decode_path, chunk, v_tail
4002                         if not chunk.ber_encoded:
4003                             payload_len += chunk.vlen
4004                     chunks_count += 1
4005                 else:
4006                     sub_decode_path = decode_path + (str(len(chunks)),)
4007                     _, chunk, v_tail = next(OctetString().decode_evgen(
4008                         v,
4009                         offset=sub_offset,
4010                         decode_path=sub_decode_path,
4011                         leavemm=True,
4012                         ctx=ctx,
4013                         _ctx_immutable=False,
4014                         _evgen_mode=False,
4015                     ))
4016                     chunks.append(chunk)
4017             except TagMismatch:
4018                 raise DecodeError(
4019                     "expected OctetString encoded chunk",
4020                     klass=self.__class__,
4021                     decode_path=sub_decode_path,
4022                     offset=sub_offset,
4023                 )
4024             sub_offset += chunk.tlvlen
4025             vlen += chunk.tlvlen
4026             v = v_tail
4027         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4028             raise DecodeError(
4029                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4030                 klass=self.__class__,
4031                 decode_path=decode_path,
4032                 offset=offset,
4033             )
4034         try:
4035             obj = self.__class__(
4036                 value=(
4037                     None if evgen_mode else
4038                     b"".join(bytes(chunk) for chunk in chunks)
4039                 ),
4040                 bounds=(self._bound_min, self._bound_max),
4041                 impl=self.tag,
4042                 expl=self._expl,
4043                 default=self.default,
4044                 optional=self.optional,
4045                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4046                 ctx=ctx,
4047             )
4048         except DecodeError as err:
4049             raise DecodeError(
4050                 msg=err.msg,
4051                 klass=self.__class__,
4052                 decode_path=decode_path,
4053                 offset=offset,
4054             )
4055         except BoundsError as err:
4056             raise DecodeError(
4057                 msg=str(err),
4058                 klass=self.__class__,
4059                 decode_path=decode_path,
4060                 offset=offset,
4061             )
4062         obj.lenindef = lenindef
4063         obj.ber_encoded = True
4064         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4065
4066     def __repr__(self):
4067         return pp_console_row(next(self.pps()))
4068
4069     def pps(self, decode_path=()):
4070         yield _pp(
4071             obj=self,
4072             asn1_type_name=self.asn1_type_name,
4073             obj_name=self.__class__.__name__,
4074             decode_path=decode_path,
4075             value=("%d bytes" % len(self._value)) if self.ready else None,
4076             blob=self._value if self.ready else None,
4077             optional=self.optional,
4078             default=self == self.default,
4079             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4080             expl=None if self._expl is None else tag_decode(self._expl),
4081             offset=self.offset,
4082             tlen=self.tlen,
4083             llen=self.llen,
4084             vlen=self.vlen,
4085             expl_offset=self.expl_offset if self.expled else None,
4086             expl_tlen=self.expl_tlen if self.expled else None,
4087             expl_llen=self.expl_llen if self.expled else None,
4088             expl_vlen=self.expl_vlen if self.expled else None,
4089             expl_lenindef=self.expl_lenindef,
4090             lenindef=self.lenindef,
4091             ber_encoded=self.ber_encoded,
4092             bered=self.bered,
4093         )
4094         defined_by, defined = self.defined or (None, None)
4095         if defined_by is not None:
4096             yield defined.pps(
4097                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4098             )
4099         for pp in self.pps_lenindef(decode_path):
4100             yield pp
4101
4102
4103 def agg_octet_string(evgens, decode_path, raw, writer):
4104     """Aggregate constructed string (OctetString and its derivatives)
4105
4106     :param evgens: iterator of generated events
4107     :param decode_path: points to the string we want to decode
4108     :param raw: slicebable (memoryview, bytearray, etc) with
4109                 the data evgens are generated on
4110     :param writer: buffer.write where string is going to be saved
4111     :param writer: where string is going to be saved. Must comply
4112                    with ``io.RawIOBase.write`` behaviour
4113
4114     .. seealso:: :ref:`agg_octet_string`
4115     """
4116     decode_path_len = len(decode_path)
4117     for dp, obj, _ in evgens:
4118         if dp[:decode_path_len] != decode_path:
4119             continue
4120         if not obj.ber_encoded:
4121             write_full(writer, raw[
4122                 obj.offset + obj.tlen + obj.llen:
4123                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4124                 (EOC_LEN if obj.expl_lenindef else 0)
4125             ])
4126         if len(dp) == decode_path_len:
4127             break
4128
4129
4130 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4131
4132
4133 class Null(Obj):
4134     """``NULL`` null object
4135
4136     >>> n = Null()
4137     NULL
4138     >>> n.ready
4139     True
4140     """
4141     __slots__ = ()
4142     tag_default = tag_encode(5)
4143     asn1_type_name = "NULL"
4144
4145     def __init__(
4146             self,
4147             value=None,  # unused, but Sequence passes it
4148             impl=None,
4149             expl=None,
4150             optional=False,
4151             _decoded=(0, 0, 0),
4152     ):
4153         """
4154         :param bytes impl: override default tag with ``IMPLICIT`` one
4155         :param bytes expl: override default tag with ``EXPLICIT`` one
4156         :param bool optional: is object ``OPTIONAL`` in sequence
4157         """
4158         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4159         self.default = None
4160
4161     @property
4162     def ready(self):
4163         return True
4164
4165     def __getstate__(self):
4166         return NullState(
4167             __version__,
4168             self.tag,
4169             self._tag_order,
4170             self._expl,
4171             self.default,
4172             self.optional,
4173             self.offset,
4174             self.llen,
4175             self.vlen,
4176             self.expl_lenindef,
4177             self.lenindef,
4178             self.ber_encoded,
4179         )
4180
4181     def __eq__(self, their):
4182         if not issubclass(their.__class__, Null):
4183             return False
4184         return (
4185             self.tag == their.tag and
4186             self._expl == their._expl
4187         )
4188
4189     def __call__(
4190             self,
4191             value=None,
4192             impl=None,
4193             expl=None,
4194             optional=None,
4195     ):
4196         return self.__class__(
4197             impl=self.tag if impl is None else impl,
4198             expl=self._expl if expl is None else expl,
4199             optional=self.optional if optional is None else optional,
4200         )
4201
4202     def _encode(self):
4203         return self.tag + LEN0
4204
4205     def _encode1st(self, state):
4206         return len(self.tag) + 1, state
4207
4208     def _encode2nd(self, writer, state_iter):
4209         write_full(writer, self.tag + LEN0)
4210
4211     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4212         try:
4213             t, _, lv = tag_strip(tlv)
4214         except DecodeError as err:
4215             raise err.__class__(
4216                 msg=err.msg,
4217                 klass=self.__class__,
4218                 decode_path=decode_path,
4219                 offset=offset,
4220             )
4221         if t != self.tag:
4222             raise TagMismatch(
4223                 klass=self.__class__,
4224                 decode_path=decode_path,
4225                 offset=offset,
4226             )
4227         if tag_only:  # pragma: no cover
4228             yield None
4229             return
4230         try:
4231             l, _, v = len_decode(lv)
4232         except DecodeError as err:
4233             raise err.__class__(
4234                 msg=err.msg,
4235                 klass=self.__class__,
4236                 decode_path=decode_path,
4237                 offset=offset,
4238             )
4239         if l != 0:
4240             raise InvalidLength(
4241                 "Null must have zero length",
4242                 klass=self.__class__,
4243                 decode_path=decode_path,
4244                 offset=offset,
4245             )
4246         obj = self.__class__(
4247             impl=self.tag,
4248             expl=self._expl,
4249             optional=self.optional,
4250             _decoded=(offset, 1, 0),
4251         )
4252         yield decode_path, obj, v
4253
4254     def __repr__(self):
4255         return pp_console_row(next(self.pps()))
4256
4257     def pps(self, decode_path=()):
4258         yield _pp(
4259             obj=self,
4260             asn1_type_name=self.asn1_type_name,
4261             obj_name=self.__class__.__name__,
4262             decode_path=decode_path,
4263             optional=self.optional,
4264             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4265             expl=None if self._expl is None else tag_decode(self._expl),
4266             offset=self.offset,
4267             tlen=self.tlen,
4268             llen=self.llen,
4269             vlen=self.vlen,
4270             expl_offset=self.expl_offset if self.expled else None,
4271             expl_tlen=self.expl_tlen if self.expled else None,
4272             expl_llen=self.expl_llen if self.expled else None,
4273             expl_vlen=self.expl_vlen if self.expled else None,
4274             expl_lenindef=self.expl_lenindef,
4275             bered=self.bered,
4276         )
4277         for pp in self.pps_lenindef(decode_path):
4278             yield pp
4279
4280
4281 ObjectIdentifierState = namedtuple(
4282     "ObjectIdentifierState",
4283     BasicState._fields + ("value", "defines"),
4284     **NAMEDTUPLE_KWARGS
4285 )
4286
4287
4288 class ObjectIdentifier(Obj):
4289     """``OBJECT IDENTIFIER`` OID type
4290
4291     >>> oid = ObjectIdentifier((1, 2, 3))
4292     OBJECT IDENTIFIER 1.2.3
4293     >>> oid == ObjectIdentifier("1.2.3")
4294     True
4295     >>> tuple(oid)
4296     (1, 2, 3)
4297     >>> str(oid)
4298     '1.2.3'
4299     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4300     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4301
4302     >>> str(ObjectIdentifier((3, 1)))
4303     Traceback (most recent call last):
4304     pyderasn.InvalidOID: unacceptable first arc value
4305     """
4306     __slots__ = ("defines",)
4307     tag_default = tag_encode(6)
4308     asn1_type_name = "OBJECT IDENTIFIER"
4309
4310     def __init__(
4311             self,
4312             value=None,
4313             defines=(),
4314             impl=None,
4315             expl=None,
4316             default=None,
4317             optional=False,
4318             _decoded=(0, 0, 0),
4319     ):
4320         """
4321         :param value: set the value. Either tuples of integers,
4322                       string of "."-concatenated integers, or
4323                       :py:class:`pyderasn.ObjectIdentifier` object
4324         :param defines: sequence of tuples. Each tuple has two elements.
4325                         First one is relative to current one decode
4326                         path, aiming to the field defined by that OID.
4327                         Read about relative path in
4328                         :py:func:`pyderasn.abs_decode_path`. Second
4329                         tuple element is ``{OID: pyderasn.Obj()}``
4330                         dictionary, mapping between current OID value
4331                         and structure applied to defined field.
4332
4333                         .. seealso:: :ref:`definedby`
4334
4335         :param bytes impl: override default tag with ``IMPLICIT`` one
4336         :param bytes expl: override default tag with ``EXPLICIT`` one
4337         :param default: set default value. Type same as in ``value``
4338         :param bool optional: is object ``OPTIONAL`` in sequence
4339         """
4340         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4341         self._value = value
4342         if value is not None:
4343             self._value = self._value_sanitize(value)
4344         if default is not None:
4345             default = self._value_sanitize(default)
4346             self.default = self.__class__(
4347                 value=default,
4348                 impl=self.tag,
4349                 expl=self._expl,
4350             )
4351             if self._value is None:
4352                 self._value = default
4353         self.defines = defines
4354
4355     def __add__(self, their):
4356         if their.__class__ == tuple:
4357             return self.__class__(self._value + array("L", their))
4358         if isinstance(their, self.__class__):
4359             return self.__class__(self._value + their._value)
4360         raise InvalidValueType((self.__class__, tuple))
4361
4362     def _value_sanitize(self, value):
4363         if issubclass(value.__class__, ObjectIdentifier):
4364             return value._value
4365         if isinstance(value, string_types):
4366             try:
4367                 value = array("L", (pureint(arc) for arc in value.split(".")))
4368             except ValueError:
4369                 raise InvalidOID("unacceptable arcs values")
4370         if value.__class__ == tuple:
4371             try:
4372                 value = array("L", value)
4373             except OverflowError as err:
4374                 raise InvalidOID(repr(err))
4375         if value.__class__ is array:
4376             if len(value) < 2:
4377                 raise InvalidOID("less than 2 arcs")
4378             first_arc = value[0]
4379             if first_arc in (0, 1):
4380                 if not (0 <= value[1] <= 39):
4381                     raise InvalidOID("second arc is too wide")
4382             elif first_arc == 2:
4383                 pass
4384             else:
4385                 raise InvalidOID("unacceptable first arc value")
4386             if not all(arc >= 0 for arc in value):
4387                 raise InvalidOID("negative arc value")
4388             return value
4389         raise InvalidValueType((self.__class__, str, tuple))
4390
4391     @property
4392     def ready(self):
4393         return self._value is not None
4394
4395     def __getstate__(self):
4396         return ObjectIdentifierState(
4397             __version__,
4398             self.tag,
4399             self._tag_order,
4400             self._expl,
4401             self.default,
4402             self.optional,
4403             self.offset,
4404             self.llen,
4405             self.vlen,
4406             self.expl_lenindef,
4407             self.lenindef,
4408             self.ber_encoded,
4409             self._value,
4410             self.defines,
4411         )
4412
4413     def __setstate__(self, state):
4414         super(ObjectIdentifier, self).__setstate__(state)
4415         self._value = state.value
4416         self.defines = state.defines
4417
4418     def __iter__(self):
4419         self._assert_ready()
4420         return iter(self._value)
4421
4422     def __str__(self):
4423         return ".".join(str(arc) for arc in self._value or ())
4424
4425     def __hash__(self):
4426         self._assert_ready()
4427         return hash(b"".join((
4428             self.tag,
4429             bytes(self._expl or b""),
4430             str(self._value).encode("ascii"),
4431         )))
4432
4433     def __eq__(self, their):
4434         if their.__class__ == tuple:
4435             return self._value == array("L", their)
4436         if not issubclass(their.__class__, ObjectIdentifier):
4437             return False
4438         return (
4439             self.tag == their.tag and
4440             self._expl == their._expl and
4441             self._value == their._value
4442         )
4443
4444     def __lt__(self, their):
4445         return self._value < their._value
4446
4447     def __call__(
4448             self,
4449             value=None,
4450             defines=None,
4451             impl=None,
4452             expl=None,
4453             default=None,
4454             optional=None,
4455     ):
4456         return self.__class__(
4457             value=value,
4458             defines=self.defines if defines is None else defines,
4459             impl=self.tag if impl is None else impl,
4460             expl=self._expl if expl is None else expl,
4461             default=self.default if default is None else default,
4462             optional=self.optional if optional is None else optional,
4463         )
4464
4465     def _encode_octets(self):
4466         self._assert_ready()
4467         value = self._value
4468         first_value = value[1]
4469         first_arc = value[0]
4470         if first_arc == 0:
4471             pass
4472         elif first_arc == 1:
4473             first_value += 40
4474         elif first_arc == 2:
4475             first_value += 80
4476         else:  # pragma: no cover
4477             raise RuntimeError("invalid arc is stored")
4478         octets = [zero_ended_encode(first_value)]
4479         for arc in value[2:]:
4480             octets.append(zero_ended_encode(arc))
4481         return b"".join(octets)
4482
4483     def _encode(self):
4484         v = self._encode_octets()
4485         return b"".join((self.tag, len_encode(len(v)), v))
4486
4487     def _encode1st(self, state):
4488         l = len(self._encode_octets())
4489         return len(self.tag) + len_size(l) + l, state
4490
4491     def _encode2nd(self, writer, state_iter):
4492         write_full(writer, self._encode())
4493
4494     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4495         try:
4496             t, _, lv = tag_strip(tlv)
4497         except DecodeError as err:
4498             raise err.__class__(
4499                 msg=err.msg,
4500                 klass=self.__class__,
4501                 decode_path=decode_path,
4502                 offset=offset,
4503             )
4504         if t != self.tag:
4505             raise TagMismatch(
4506                 klass=self.__class__,
4507                 decode_path=decode_path,
4508                 offset=offset,
4509             )
4510         if tag_only:  # pragma: no cover
4511             yield None
4512             return
4513         try:
4514             l, llen, v = len_decode(lv)
4515         except DecodeError as err:
4516             raise err.__class__(
4517                 msg=err.msg,
4518                 klass=self.__class__,
4519                 decode_path=decode_path,
4520                 offset=offset,
4521             )
4522         if l > len(v):
4523             raise NotEnoughData(
4524                 "encoded length is longer than data",
4525                 klass=self.__class__,
4526                 decode_path=decode_path,
4527                 offset=offset,
4528             )
4529         if l == 0:
4530             raise NotEnoughData(
4531                 "zero length",
4532                 klass=self.__class__,
4533                 decode_path=decode_path,
4534                 offset=offset,
4535             )
4536         v, tail = v[:l], v[l:]
4537         arcs = array("L")
4538         ber_encoded = False
4539         while len(v) > 0:
4540             i = 0
4541             arc = 0
4542             while True:
4543                 octet = indexbytes(v, i)
4544                 if i == 0 and octet == 0x80:
4545                     if ctx.get("bered", False):
4546                         ber_encoded = True
4547                     else:
4548                         raise DecodeError(
4549                             "non normalized arc encoding",
4550                             klass=self.__class__,
4551                             decode_path=decode_path,
4552                             offset=offset,
4553                         )
4554                 arc = (arc << 7) | (octet & 0x7F)
4555                 if octet & 0x80 == 0:
4556                     try:
4557                         arcs.append(arc)
4558                     except OverflowError:
4559                         raise DecodeError(
4560                             "too huge value for local unsigned long",
4561                             klass=self.__class__,
4562                             decode_path=decode_path,
4563                             offset=offset,
4564                         )
4565                     v = v[i + 1:]
4566                     break
4567                 i += 1
4568                 if i == len(v):
4569                     raise DecodeError(
4570                         "unfinished OID",
4571                         klass=self.__class__,
4572                         decode_path=decode_path,
4573                         offset=offset,
4574                     )
4575         first_arc = 0
4576         second_arc = arcs[0]
4577         if 0 <= second_arc <= 39:
4578             first_arc = 0
4579         elif 40 <= second_arc <= 79:
4580             first_arc = 1
4581             second_arc -= 40
4582         else:
4583             first_arc = 2
4584             second_arc -= 80
4585         obj = self.__class__(
4586             value=array("L", (first_arc, second_arc)) + arcs[1:],
4587             impl=self.tag,
4588             expl=self._expl,
4589             default=self.default,
4590             optional=self.optional,
4591             _decoded=(offset, llen, l),
4592         )
4593         if ber_encoded:
4594             obj.ber_encoded = True
4595         yield decode_path, obj, tail
4596
4597     def __repr__(self):
4598         return pp_console_row(next(self.pps()))
4599
4600     def pps(self, decode_path=()):
4601         yield _pp(
4602             obj=self,
4603             asn1_type_name=self.asn1_type_name,
4604             obj_name=self.__class__.__name__,
4605             decode_path=decode_path,
4606             value=str(self) if self.ready else None,
4607             optional=self.optional,
4608             default=self == self.default,
4609             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4610             expl=None if self._expl is None else tag_decode(self._expl),
4611             offset=self.offset,
4612             tlen=self.tlen,
4613             llen=self.llen,
4614             vlen=self.vlen,
4615             expl_offset=self.expl_offset if self.expled else None,
4616             expl_tlen=self.expl_tlen if self.expled else None,
4617             expl_llen=self.expl_llen if self.expled else None,
4618             expl_vlen=self.expl_vlen if self.expled else None,
4619             expl_lenindef=self.expl_lenindef,
4620             ber_encoded=self.ber_encoded,
4621             bered=self.bered,
4622         )
4623         for pp in self.pps_lenindef(decode_path):
4624             yield pp
4625
4626
4627 class Enumerated(Integer):
4628     """``ENUMERATED`` integer type
4629
4630     This type is identical to :py:class:`pyderasn.Integer`, but requires
4631     schema to be specified and does not accept values missing from it.
4632     """
4633     __slots__ = ()
4634     tag_default = tag_encode(10)
4635     asn1_type_name = "ENUMERATED"
4636
4637     def __init__(
4638             self,
4639             value=None,
4640             impl=None,
4641             expl=None,
4642             default=None,
4643             optional=False,
4644             _specs=None,
4645             _decoded=(0, 0, 0),
4646             bounds=None,  # dummy argument, workability for Integer.decode
4647     ):
4648         super(Enumerated, self).__init__(
4649             value, bounds, impl, expl, default, optional, _specs, _decoded,
4650         )
4651         if len(self.specs) == 0:
4652             raise ValueError("schema must be specified")
4653
4654     def _value_sanitize(self, value):
4655         if isinstance(value, self.__class__):
4656             value = value._value
4657         elif isinstance(value, integer_types):
4658             for _value in itervalues(self.specs):
4659                 if _value == value:
4660                     break
4661             else:
4662                 raise DecodeError(
4663                     "unknown integer value: %s" % value,
4664                     klass=self.__class__,
4665                 )
4666         elif isinstance(value, string_types):
4667             value = self.specs.get(value)
4668             if value is None:
4669                 raise ObjUnknown("integer value: %s" % value)
4670         else:
4671             raise InvalidValueType((self.__class__, int, str))
4672         return value
4673
4674     def __call__(
4675             self,
4676             value=None,
4677             impl=None,
4678             expl=None,
4679             default=None,
4680             optional=None,
4681             _specs=None,
4682     ):
4683         return self.__class__(
4684             value=value,
4685             impl=self.tag if impl is None else impl,
4686             expl=self._expl if expl is None else expl,
4687             default=self.default if default is None else default,
4688             optional=self.optional if optional is None else optional,
4689             _specs=self.specs,
4690         )
4691
4692
4693 def escape_control_unicode(c):
4694     if unicat(c)[0] == "C":
4695         c = repr(c).lstrip("u").strip("'")
4696     return c
4697
4698
4699 class CommonString(OctetString):
4700     """Common class for all strings
4701
4702     Everything resembles :py:class:`pyderasn.OctetString`, except
4703     ability to deal with unicode text strings.
4704
4705     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4706     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4707     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4708     True
4709     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4710     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4711     >>> str(s)
4712     'привет Ð¼Ð¸Ñ€'
4713     >>> hexenc(bytes(s))
4714     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4715
4716     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4717     Traceback (most recent call last):
4718     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4719
4720     >>> BMPString("ада", bounds=(2, 2))
4721     Traceback (most recent call last):
4722     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4723     >>> s = BMPString("ад", bounds=(2, 2))
4724     >>> s.encoding
4725     'utf-16-be'
4726     >>> hexenc(bytes(s))
4727     '04300434'
4728
4729     .. list-table::
4730        :header-rows: 1
4731
4732        * - Class
4733          - Text Encoding, validation
4734        * - :py:class:`pyderasn.UTF8String`
4735          - utf-8
4736        * - :py:class:`pyderasn.NumericString`
4737          - proper alphabet validation
4738        * - :py:class:`pyderasn.PrintableString`
4739          - proper alphabet validation
4740        * - :py:class:`pyderasn.TeletexString`
4741          - iso-8859-1
4742        * - :py:class:`pyderasn.T61String`
4743          - iso-8859-1
4744        * - :py:class:`pyderasn.VideotexString`
4745          - iso-8859-1
4746        * - :py:class:`pyderasn.IA5String`
4747          - proper alphabet validation
4748        * - :py:class:`pyderasn.GraphicString`
4749          - iso-8859-1
4750        * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4751          - proper alphabet validation
4752        * - :py:class:`pyderasn.GeneralString`
4753          - iso-8859-1
4754        * - :py:class:`pyderasn.UniversalString`
4755          - utf-32-be
4756        * - :py:class:`pyderasn.BMPString`
4757          - utf-16-be
4758     """
4759     __slots__ = ()
4760
4761     def _value_sanitize(self, value):
4762         value_raw = None
4763         value_decoded = None
4764         if isinstance(value, self.__class__):
4765             value_raw = value._value
4766         elif value.__class__ == text_type:
4767             value_decoded = value
4768         elif value.__class__ == binary_type:
4769             value_raw = value
4770         else:
4771             raise InvalidValueType((self.__class__, text_type, binary_type))
4772         try:
4773             value_raw = (
4774                 value_decoded.encode(self.encoding)
4775                 if value_raw is None else value_raw
4776             )
4777             value_decoded = (
4778                 value_raw.decode(self.encoding)
4779                 if value_decoded is None else value_decoded
4780             )
4781         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4782             raise DecodeError(str(err))
4783         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4784             raise BoundsError(
4785                 self._bound_min,
4786                 len(value_decoded),
4787                 self._bound_max,
4788             )
4789         return value_raw
4790
4791     def __eq__(self, their):
4792         if their.__class__ == binary_type:
4793             return self._value == their
4794         if their.__class__ == text_type:
4795             return self._value == their.encode(self.encoding)
4796         if not isinstance(their, self.__class__):
4797             return False
4798         return (
4799             self._value == their._value and
4800             self.tag == their.tag and
4801             self._expl == their._expl
4802         )
4803
4804     def __unicode__(self):
4805         if self.ready:
4806             return self._value.decode(self.encoding)
4807         return text_type(self._value)
4808
4809     def __repr__(self):
4810         return pp_console_row(next(self.pps(no_unicode=PY2)))
4811
4812     def pps(self, decode_path=(), no_unicode=False):
4813         value = None
4814         if self.ready:
4815             value = (
4816                 hexenc(bytes(self)) if no_unicode else
4817                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4818             )
4819         yield _pp(
4820             obj=self,
4821             asn1_type_name=self.asn1_type_name,
4822             obj_name=self.__class__.__name__,
4823             decode_path=decode_path,
4824             value=value,
4825             optional=self.optional,
4826             default=self == self.default,
4827             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4828             expl=None if self._expl is None else tag_decode(self._expl),
4829             offset=self.offset,
4830             tlen=self.tlen,
4831             llen=self.llen,
4832             vlen=self.vlen,
4833             expl_offset=self.expl_offset if self.expled else None,
4834             expl_tlen=self.expl_tlen if self.expled else None,
4835             expl_llen=self.expl_llen if self.expled else None,
4836             expl_vlen=self.expl_vlen if self.expled else None,
4837             expl_lenindef=self.expl_lenindef,
4838             ber_encoded=self.ber_encoded,
4839             bered=self.bered,
4840         )
4841         for pp in self.pps_lenindef(decode_path):
4842             yield pp
4843
4844
4845 class UTF8String(CommonString):
4846     __slots__ = ()
4847     tag_default = tag_encode(12)
4848     encoding = "utf-8"
4849     asn1_type_name = "UTF8String"
4850
4851
4852 class AllowableCharsMixin(object):
4853     @property
4854     def allowable_chars(self):
4855         if PY2:
4856             return self._allowable_chars
4857         return frozenset(six_unichr(c) for c in self._allowable_chars)
4858
4859     def _value_sanitize(self, value):
4860         value = super(AllowableCharsMixin, self)._value_sanitize(value)
4861         if not frozenset(value) <= self._allowable_chars:
4862             raise DecodeError("non satisfying alphabet value")
4863         return value
4864
4865
4866 class NumericString(AllowableCharsMixin, CommonString):
4867     """Numeric string
4868
4869     Its value is properly sanitized: only ASCII digits with spaces can
4870     be stored.
4871
4872     >>> NumericString().allowable_chars
4873     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4874     """
4875     __slots__ = ()
4876     tag_default = tag_encode(18)
4877     encoding = "ascii"
4878     asn1_type_name = "NumericString"
4879     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4880
4881
4882 PrintableStringState = namedtuple(
4883     "PrintableStringState",
4884     OctetStringState._fields + ("allowable_chars",),
4885     **NAMEDTUPLE_KWARGS
4886 )
4887
4888
4889 class PrintableString(AllowableCharsMixin, CommonString):
4890     """Printable string
4891
4892     Its value is properly sanitized: see X.680 41.4 table 10.
4893
4894     >>> PrintableString().allowable_chars
4895     frozenset([' ', "'", ..., 'z'])
4896     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4897     PrintableString PrintableString foo*bar
4898     >>> obj.allow_asterisk, obj.allow_ampersand
4899     (True, False)
4900     """
4901     __slots__ = ()
4902     tag_default = tag_encode(19)
4903     encoding = "ascii"
4904     asn1_type_name = "PrintableString"
4905     _allowable_chars = frozenset(
4906         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4907     )
4908     _asterisk = frozenset("*".encode("ascii"))
4909     _ampersand = frozenset("&".encode("ascii"))
4910
4911     def __init__(
4912             self,
4913             value=None,
4914             bounds=None,
4915             impl=None,
4916             expl=None,
4917             default=None,
4918             optional=False,
4919             _decoded=(0, 0, 0),
4920             ctx=None,
4921             allow_asterisk=False,
4922             allow_ampersand=False,
4923     ):
4924         """
4925         :param allow_asterisk: allow asterisk character
4926         :param allow_ampersand: allow ampersand character
4927         """
4928         if allow_asterisk:
4929             self._allowable_chars |= self._asterisk
4930         if allow_ampersand:
4931             self._allowable_chars |= self._ampersand
4932         super(PrintableString, self).__init__(
4933             value, bounds, impl, expl, default, optional, _decoded, ctx,
4934         )
4935
4936     @property
4937     def allow_asterisk(self):
4938         """Is asterisk character allowed?
4939         """
4940         return self._asterisk <= self._allowable_chars
4941
4942     @property
4943     def allow_ampersand(self):
4944         """Is ampersand character allowed?
4945         """
4946         return self._ampersand <= self._allowable_chars
4947
4948     def __getstate__(self):
4949         return PrintableStringState(
4950             *super(PrintableString, self).__getstate__(),
4951             **{"allowable_chars": self._allowable_chars}
4952         )
4953
4954     def __setstate__(self, state):
4955         super(PrintableString, self).__setstate__(state)
4956         self._allowable_chars = state.allowable_chars
4957
4958     def __call__(
4959             self,
4960             value=None,
4961             bounds=None,
4962             impl=None,
4963             expl=None,
4964             default=None,
4965             optional=None,
4966     ):
4967         return self.__class__(
4968             value=value,
4969             bounds=(
4970                 (self._bound_min, self._bound_max)
4971                 if bounds is None else bounds
4972             ),
4973             impl=self.tag if impl is None else impl,
4974             expl=self._expl if expl is None else expl,
4975             default=self.default if default is None else default,
4976             optional=self.optional if optional is None else optional,
4977             allow_asterisk=self.allow_asterisk,
4978             allow_ampersand=self.allow_ampersand,
4979         )
4980
4981
4982 class TeletexString(CommonString):
4983     __slots__ = ()
4984     tag_default = tag_encode(20)
4985     encoding = "iso-8859-1"
4986     asn1_type_name = "TeletexString"
4987
4988
4989 class T61String(TeletexString):
4990     __slots__ = ()
4991     asn1_type_name = "T61String"
4992
4993
4994 class VideotexString(CommonString):
4995     __slots__ = ()
4996     tag_default = tag_encode(21)
4997     encoding = "iso-8859-1"
4998     asn1_type_name = "VideotexString"
4999
5000
5001 class IA5String(AllowableCharsMixin, CommonString):
5002     """IA5 string
5003
5004     Its value is properly sanitized: it is a mix of
5005
5006     * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
5007     * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
5008     * DEL character (0x7F)
5009
5010     It is just 7-bit ASCII.
5011
5012     >>> IA5String().allowable_chars
5013     frozenset(["NUL", ... "DEL"])
5014     """
5015     __slots__ = ()
5016     tag_default = tag_encode(22)
5017     encoding = "ascii"
5018     asn1_type_name = "IA5"
5019     _allowable_chars = frozenset(b"".join(
5020         six_unichr(c).encode("ascii") for c in six_xrange(128)
5021     ))
5022
5023
5024 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5025 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5026 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5027 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5028 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5029 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5030
5031
5032 class VisibleString(AllowableCharsMixin, CommonString):
5033     """Visible string
5034
5035     Its value is properly sanitized. ASCII subset from space to tilde is
5036     allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5037
5038     >>> VisibleString().allowable_chars
5039     frozenset([" ", ... "~"])
5040     """
5041     __slots__ = ()
5042     tag_default = tag_encode(26)
5043     encoding = "ascii"
5044     asn1_type_name = "VisibleString"
5045     _allowable_chars = frozenset(b"".join(
5046         six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5047     ))
5048
5049
5050 class ISO646String(VisibleString):
5051     __slots__ = ()
5052     asn1_type_name = "ISO646String"
5053
5054
5055 UTCTimeState = namedtuple(
5056     "UTCTimeState",
5057     OctetStringState._fields + ("ber_raw",),
5058     **NAMEDTUPLE_KWARGS
5059 )
5060
5061
5062 def str_to_time_fractions(value):
5063     v = pureint(value)
5064     year, v = (v // 10**10), (v % 10**10)
5065     month, v = (v // 10**8), (v % 10**8)
5066     day, v = (v // 10**6), (v % 10**6)
5067     hour, v = (v // 10**4), (v % 10**4)
5068     minute, second = (v // 100), (v % 100)
5069     return year, month, day, hour, minute, second
5070
5071
5072 class UTCTime(VisibleString):
5073     """``UTCTime`` datetime type
5074
5075     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5076     UTCTime UTCTime 2017-09-30T22:07:50
5077     >>> str(t)
5078     '170930220750Z'
5079     >>> bytes(t)
5080     b'170930220750Z'
5081     >>> t.todatetime()
5082     datetime.datetime(2017, 9, 30, 22, 7, 50)
5083     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5084     datetime.datetime(1957, 9, 30, 22, 7, 50)
5085
5086     If BER encoded value was met, then ``ber_raw`` attribute will hold
5087     its raw representation.
5088
5089     .. warning::
5090
5091        Only **naive** ``datetime`` objects are supported.
5092        Library assumes that all work is done in UTC.
5093
5094     .. warning::
5095
5096        Pay attention that ``UTCTime`` can not hold full year, so all years
5097        having < 50 years are treated as 20xx, 19xx otherwise, according to
5098        X.509 recommendation. Use ``GeneralizedTime`` instead for
5099        removing ambiguity.
5100
5101     .. warning::
5102
5103        No strict validation of UTC offsets are made (only applicable to
5104        **BER**), but very crude:
5105
5106        * minutes are not exceeding 60
5107        * offset value is not exceeding 14 hours
5108     """
5109     __slots__ = ("ber_raw",)
5110     tag_default = tag_encode(23)
5111     encoding = "ascii"
5112     asn1_type_name = "UTCTime"
5113     evgen_mode_skip_value = False
5114
5115     def __init__(
5116             self,
5117             value=None,
5118             impl=None,
5119             expl=None,
5120             default=None,
5121             optional=False,
5122             _decoded=(0, 0, 0),
5123             bounds=None,  # dummy argument, workability for OctetString.decode
5124             ctx=None,
5125     ):
5126         """
5127         :param value: set the value. Either datetime type, or
5128                       :py:class:`pyderasn.UTCTime` object
5129         :param bytes impl: override default tag with ``IMPLICIT`` one
5130         :param bytes expl: override default tag with ``EXPLICIT`` one
5131         :param default: set default value. Type same as in ``value``
5132         :param bool optional: is object ``OPTIONAL`` in sequence
5133         """
5134         super(UTCTime, self).__init__(
5135             None, None, impl, expl, None, optional, _decoded, ctx,
5136         )
5137         self._value = value
5138         self.ber_raw = None
5139         if value is not None:
5140             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5141             self.ber_encoded = self.ber_raw is not None
5142         if default is not None:
5143             default, _ = self._value_sanitize(default)
5144             self.default = self.__class__(
5145                 value=default,
5146                 impl=self.tag,
5147                 expl=self._expl,
5148             )
5149             if self._value is None:
5150                 self._value = default
5151             optional = True
5152         self.optional = optional
5153
5154     def _strptime_bered(self, value):
5155         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5156         value = value[10:]
5157         if len(value) == 0:
5158             raise ValueError("no timezone")
5159         year += 2000 if year < 50 else 1900
5160         decoded = datetime(year, month, day, hour, minute)
5161         offset = 0
5162         if value[-1] == "Z":
5163             value = value[:-1]
5164         else:
5165             if len(value) < 5:
5166                 raise ValueError("invalid UTC offset")
5167             if value[-5] == "-":
5168                 sign = -1
5169             elif value[-5] == "+":
5170                 sign = 1
5171             else:
5172                 raise ValueError("invalid UTC offset")
5173             v = pureint(value[-4:])
5174             offset, v = (60 * (v % 100)), v // 100
5175             if offset >= 3600:
5176                 raise ValueError("invalid UTC offset minutes")
5177             offset += 3600 * v
5178             if offset > 14 * 3600:
5179                 raise ValueError("too big UTC offset")
5180             offset *= sign
5181             value = value[:-5]
5182         if len(value) == 0:
5183             return offset, decoded
5184         if len(value) != 2:
5185             raise ValueError("invalid UTC offset seconds")
5186         seconds = pureint(value)
5187         if seconds >= 60:
5188             raise ValueError("invalid seconds value")
5189         return offset, decoded + timedelta(seconds=seconds)
5190
5191     def _strptime(self, value):
5192         # datetime.strptime's format: %y%m%d%H%M%SZ
5193         if len(value) != LEN_YYMMDDHHMMSSZ:
5194             raise ValueError("invalid UTCTime length")
5195         if value[-1] != "Z":
5196             raise ValueError("non UTC timezone")
5197         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5198         year += 2000 if year < 50 else 1900
5199         return datetime(year, month, day, hour, minute, second)
5200
5201     def _dt_sanitize(self, value):
5202         if value.year < 1950 or value.year > 2049:
5203             raise ValueError("UTCTime can hold only 1950-2049 years")
5204         return value.replace(microsecond=0)
5205
5206     def _value_sanitize(self, value, ctx=None):
5207         if value.__class__ == binary_type:
5208             try:
5209                 value_decoded = value.decode("ascii")
5210             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5211                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5212             err = None
5213             try:
5214                 return self._strptime(value_decoded), None
5215             except (TypeError, ValueError) as _err:
5216                 err = _err
5217                 if (ctx is not None) and ctx.get("bered", False):
5218                     try:
5219                         offset, _value = self._strptime_bered(value_decoded)
5220                         _value = _value - timedelta(seconds=offset)
5221                         return self._dt_sanitize(_value), value
5222                     except (TypeError, ValueError, OverflowError) as _err:
5223                         err = _err
5224             raise DecodeError(
5225                 "invalid %s format: %r" % (self.asn1_type_name, err),
5226                 klass=self.__class__,
5227             )
5228         if isinstance(value, self.__class__):
5229             return value._value, None
5230         if value.__class__ == datetime:
5231             if value.tzinfo is not None:
5232                 raise ValueError("only naive datetime supported")
5233             return self._dt_sanitize(value), None
5234         raise InvalidValueType((self.__class__, datetime))
5235
5236     def _pp_value(self):
5237         if self.ready:
5238             value = self._value.isoformat()
5239             if self.ber_encoded:
5240                 value += " (%s)" % self.ber_raw
5241             return value
5242         return None
5243
5244     def __unicode__(self):
5245         if self.ready:
5246             value = self._value.isoformat()
5247             if self.ber_encoded:
5248                 value += " (%s)" % self.ber_raw
5249             return value
5250         return text_type(self._pp_value())
5251
5252     def __getstate__(self):
5253         return UTCTimeState(
5254             *super(UTCTime, self).__getstate__(),
5255             **{"ber_raw": self.ber_raw}
5256         )
5257
5258     def __setstate__(self, state):
5259         super(UTCTime, self).__setstate__(state)
5260         self.ber_raw = state.ber_raw
5261
5262     def __bytes__(self):
5263         self._assert_ready()
5264         return self._encode_time()
5265
5266     def __eq__(self, their):
5267         if their.__class__ == binary_type:
5268             return self._encode_time() == their
5269         if their.__class__ == datetime:
5270             return self.todatetime() == their
5271         if not isinstance(their, self.__class__):
5272             return False
5273         return (
5274             self._value == their._value and
5275             self.tag == their.tag and
5276             self._expl == their._expl
5277         )
5278
5279     def _encode_time(self):
5280         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5281
5282     def _encode(self):
5283         self._assert_ready()
5284         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5285
5286     def _encode1st(self, state):
5287         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5288
5289     def _encode2nd(self, writer, state_iter):
5290         self._assert_ready()
5291         write_full(writer, self._encode())
5292
5293     def _encode_cer(self, writer):
5294         write_full(writer, self._encode())
5295
5296     def todatetime(self):
5297         return self._value
5298
5299     def __repr__(self):
5300         return pp_console_row(next(self.pps()))
5301
5302     def pps(self, decode_path=()):
5303         yield _pp(
5304             obj=self,
5305             asn1_type_name=self.asn1_type_name,
5306             obj_name=self.__class__.__name__,
5307             decode_path=decode_path,
5308             value=self._pp_value(),
5309             optional=self.optional,
5310             default=self == self.default,
5311             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5312             expl=None if self._expl is None else tag_decode(self._expl),
5313             offset=self.offset,
5314             tlen=self.tlen,
5315             llen=self.llen,
5316             vlen=self.vlen,
5317             expl_offset=self.expl_offset if self.expled else None,
5318             expl_tlen=self.expl_tlen if self.expled else None,
5319             expl_llen=self.expl_llen if self.expled else None,
5320             expl_vlen=self.expl_vlen if self.expled else None,
5321             expl_lenindef=self.expl_lenindef,
5322             ber_encoded=self.ber_encoded,
5323             bered=self.bered,
5324         )
5325         for pp in self.pps_lenindef(decode_path):
5326             yield pp
5327
5328
5329 class GeneralizedTime(UTCTime):
5330     """``GeneralizedTime`` datetime type
5331
5332     This type is similar to :py:class:`pyderasn.UTCTime`.
5333
5334     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5335     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5336     >>> str(t)
5337     '20170930220750.000123Z'
5338     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5339     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5340
5341     .. warning::
5342
5343        Only **naive** datetime objects are supported.
5344        Library assumes that all work is done in UTC.
5345
5346     .. warning::
5347
5348        Only **microsecond** fractions are supported in DER encoding.
5349        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5350        higher precision values.
5351
5352     .. warning::
5353
5354        **BER** encoded data can loss information (accuracy) during
5355        decoding because of float transformations.
5356
5357     .. warning::
5358
5359        **Zero** year is unsupported.
5360     """
5361     __slots__ = ()
5362     tag_default = tag_encode(24)
5363     asn1_type_name = "GeneralizedTime"
5364
5365     def _dt_sanitize(self, value):
5366         return value
5367
5368     def _strptime_bered(self, value):
5369         if len(value) < 4 + 3 * 2:
5370             raise ValueError("invalid GeneralizedTime")
5371         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5372         decoded = datetime(year, month, day, hour)
5373         offset, value = 0, value[10:]
5374         if len(value) == 0:
5375             return offset, decoded
5376         if value[-1] == "Z":
5377             value = value[:-1]
5378         else:
5379             for char, sign in (("-", -1), ("+", 1)):
5380                 idx = value.rfind(char)
5381                 if idx == -1:
5382                     continue
5383                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5384                 v = pureint(offset_raw)
5385                 if len(offset_raw) == 4:
5386                     offset, v = (60 * (v % 100)), v // 100
5387                     if offset >= 3600:
5388                         raise ValueError("invalid UTC offset minutes")
5389                 elif len(offset_raw) == 2:
5390                     pass
5391                 else:
5392                     raise ValueError("invalid UTC offset")
5393                 offset += 3600 * v
5394                 if offset > 14 * 3600:
5395                     raise ValueError("too big UTC offset")
5396                 offset *= sign
5397                 break
5398         if len(value) == 0:
5399             return offset, decoded
5400         if value[0] in DECIMAL_SIGNS:
5401             return offset, (
5402                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5403             )
5404         if len(value) < 2:
5405             raise ValueError("stripped minutes")
5406         decoded += timedelta(seconds=60 * pureint(value[:2]))
5407         value = value[2:]
5408         if len(value) == 0:
5409             return offset, decoded
5410         if value[0] in DECIMAL_SIGNS:
5411             return offset, (
5412                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5413             )
5414         if len(value) < 2:
5415             raise ValueError("stripped seconds")
5416         decoded += timedelta(seconds=pureint(value[:2]))
5417         value = value[2:]
5418         if len(value) == 0:
5419             return offset, decoded
5420         if value[0] not in DECIMAL_SIGNS:
5421             raise ValueError("invalid format after seconds")
5422         return offset, (
5423             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5424         )
5425
5426     def _strptime(self, value):
5427         l = len(value)
5428         if l == LEN_YYYYMMDDHHMMSSZ:
5429             # datetime.strptime's format: %Y%m%d%H%M%SZ
5430             if value[-1] != "Z":
5431                 raise ValueError("non UTC timezone")
5432             return datetime(*str_to_time_fractions(value[:-1]))
5433         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5434             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5435             if value[-1] != "Z":
5436                 raise ValueError("non UTC timezone")
5437             if value[14] != ".":
5438                 raise ValueError("no fractions separator")
5439             us = value[15:-1]
5440             if us[-1] == "0":
5441                 raise ValueError("trailing zero")
5442             us_len = len(us)
5443             if us_len > 6:
5444                 raise ValueError("only microsecond fractions are supported")
5445             us = pureint(us + ("0" * (6 - us_len)))
5446             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5447             return datetime(year, month, day, hour, minute, second, us)
5448         raise ValueError("invalid GeneralizedTime length")
5449
5450     def _encode_time(self):
5451         value = self._value
5452         encoded = value.strftime("%Y%m%d%H%M%S")
5453         if value.microsecond > 0:
5454             encoded += (".%06d" % value.microsecond).rstrip("0")
5455         return (encoded + "Z").encode("ascii")
5456
5457     def _encode(self):
5458         self._assert_ready()
5459         value = self._value
5460         if value.microsecond > 0:
5461             encoded = self._encode_time()
5462             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5463         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5464
5465     def _encode1st(self, state):
5466         self._assert_ready()
5467         vlen = len(self._encode_time())
5468         return len(self.tag) + len_size(vlen) + vlen, state
5469
5470     def _encode2nd(self, writer, state_iter):
5471         write_full(writer, self._encode())
5472
5473
5474 class GraphicString(CommonString):
5475     __slots__ = ()
5476     tag_default = tag_encode(25)
5477     encoding = "iso-8859-1"
5478     asn1_type_name = "GraphicString"
5479
5480
5481 class GeneralString(CommonString):
5482     __slots__ = ()
5483     tag_default = tag_encode(27)
5484     encoding = "iso-8859-1"
5485     asn1_type_name = "GeneralString"
5486
5487
5488 class UniversalString(CommonString):
5489     __slots__ = ()
5490     tag_default = tag_encode(28)
5491     encoding = "utf-32-be"
5492     asn1_type_name = "UniversalString"
5493
5494
5495 class BMPString(CommonString):
5496     __slots__ = ()
5497     tag_default = tag_encode(30)
5498     encoding = "utf-16-be"
5499     asn1_type_name = "BMPString"
5500
5501
5502 ChoiceState = namedtuple(
5503     "ChoiceState",
5504     BasicState._fields + ("specs", "value",),
5505     **NAMEDTUPLE_KWARGS
5506 )
5507
5508
5509 class Choice(Obj):
5510     """``CHOICE`` special type
5511
5512     ::
5513
5514         class GeneralName(Choice):
5515             schema = (
5516                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5517                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5518             )
5519
5520     >>> gn = GeneralName()
5521     GeneralName CHOICE
5522     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5523     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5524     >>> gn["dNSName"] = IA5String("bar.baz")
5525     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5526     >>> gn["rfc822Name"]
5527     None
5528     >>> gn["dNSName"]
5529     [2] IA5String IA5 bar.baz
5530     >>> gn.choice
5531     'dNSName'
5532     >>> gn.value == gn["dNSName"]
5533     True
5534     >>> gn.specs
5535     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5536
5537     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5538     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5539     """
5540     __slots__ = ("specs",)
5541     tag_default = None
5542     asn1_type_name = "CHOICE"
5543
5544     def __init__(
5545             self,
5546             value=None,
5547             schema=None,
5548             impl=None,
5549             expl=None,
5550             default=None,
5551             optional=False,
5552             _decoded=(0, 0, 0),
5553     ):
5554         """
5555         :param value: set the value. Either ``(choice, value)`` tuple, or
5556                       :py:class:`pyderasn.Choice` object
5557         :param bytes impl: can not be set, do **not** use it
5558         :param bytes expl: override default tag with ``EXPLICIT`` one
5559         :param default: set default value. Type same as in ``value``
5560         :param bool optional: is object ``OPTIONAL`` in sequence
5561         """
5562         if impl is not None:
5563             raise ValueError("no implicit tag allowed for CHOICE")
5564         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5565         if schema is None:
5566             schema = getattr(self, "schema", ())
5567         if len(schema) == 0:
5568             raise ValueError("schema must be specified")
5569         self.specs = (
5570             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5571         )
5572         self._value = None
5573         if value is not None:
5574             self._value = self._value_sanitize(value)
5575         if default is not None:
5576             default_value = self._value_sanitize(default)
5577             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5578             default_obj.specs = self.specs
5579             default_obj._value = default_value
5580             self.default = default_obj
5581             if value is None:
5582                 self._value = copy(default_obj._value)
5583         if self._expl is not None:
5584             tag_class, _, tag_num = tag_decode(self._expl)
5585             self._tag_order = (tag_class, tag_num)
5586
5587     def _value_sanitize(self, value):
5588         if (value.__class__ == tuple) and len(value) == 2:
5589             choice, obj = value
5590             spec = self.specs.get(choice)
5591             if spec is None:
5592                 raise ObjUnknown(choice)
5593             if not isinstance(obj, spec.__class__):
5594                 raise InvalidValueType((spec,))
5595             return (choice, spec(obj))
5596         if isinstance(value, self.__class__):
5597             return value._value
5598         raise InvalidValueType((self.__class__, tuple))
5599
5600     @property
5601     def ready(self):
5602         return self._value is not None and self._value[1].ready
5603
5604     @property
5605     def bered(self):
5606         return self.expl_lenindef or (
5607             (self._value is not None) and
5608             self._value[1].bered
5609         )
5610
5611     def __getstate__(self):
5612         return ChoiceState(
5613             __version__,
5614             self.tag,
5615             self._tag_order,
5616             self._expl,
5617             self.default,
5618             self.optional,
5619             self.offset,
5620             self.llen,
5621             self.vlen,
5622             self.expl_lenindef,
5623             self.lenindef,
5624             self.ber_encoded,
5625             self.specs,
5626             copy(self._value),
5627         )
5628
5629     def __setstate__(self, state):
5630         super(Choice, self).__setstate__(state)
5631         self.specs = state.specs
5632         self._value = state.value
5633
5634     def __eq__(self, their):
5635         if (their.__class__ == tuple) and len(their) == 2:
5636             return self._value == their
5637         if not isinstance(their, self.__class__):
5638             return False
5639         return (
5640             self.specs == their.specs and
5641             self._value == their._value
5642         )
5643
5644     def __call__(
5645             self,
5646             value=None,
5647             expl=None,
5648             default=None,
5649             optional=None,
5650     ):
5651         return self.__class__(
5652             value=value,
5653             schema=self.specs,
5654             expl=self._expl if expl is None else expl,
5655             default=self.default if default is None else default,
5656             optional=self.optional if optional is None else optional,
5657         )
5658
5659     @property
5660     def choice(self):
5661         """Name of the choice
5662         """
5663         self._assert_ready()
5664         return self._value[0]
5665
5666     @property
5667     def value(self):
5668         """Value of underlying choice
5669         """
5670         self._assert_ready()
5671         return self._value[1]
5672
5673     @property
5674     def tag_order(self):
5675         self._assert_ready()
5676         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5677
5678     @property
5679     def tag_order_cer(self):
5680         return min(v.tag_order_cer for v in itervalues(self.specs))
5681
5682     def __getitem__(self, key):
5683         if key not in self.specs:
5684             raise ObjUnknown(key)
5685         if self._value is None:
5686             return None
5687         choice, value = self._value
5688         if choice != key:
5689             return None
5690         return value
5691
5692     def __setitem__(self, key, value):
5693         spec = self.specs.get(key)
5694         if spec is None:
5695             raise ObjUnknown(key)
5696         if not isinstance(value, spec.__class__):
5697             raise InvalidValueType((spec.__class__,))
5698         self._value = (key, spec(value))
5699
5700     @property
5701     def tlen(self):
5702         return 0
5703
5704     @property
5705     def decoded(self):
5706         return self._value[1].decoded if self.ready else False
5707
5708     def _encode(self):
5709         self._assert_ready()
5710         return self._value[1].encode()
5711
5712     def _encode1st(self, state):
5713         self._assert_ready()
5714         return self._value[1].encode1st(state)
5715
5716     def _encode2nd(self, writer, state_iter):
5717         self._value[1].encode2nd(writer, state_iter)
5718
5719     def _encode_cer(self, writer):
5720         self._assert_ready()
5721         self._value[1].encode_cer(writer)
5722
5723     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5724         for choice, spec in iteritems(self.specs):
5725             sub_decode_path = decode_path + (choice,)
5726             try:
5727                 spec.decode(
5728                     tlv,
5729                     offset=offset,
5730                     leavemm=True,
5731                     decode_path=sub_decode_path,
5732                     ctx=ctx,
5733                     tag_only=True,
5734                     _ctx_immutable=False,
5735                 )
5736             except TagMismatch:
5737                 continue
5738             break
5739         else:
5740             raise TagMismatch(
5741                 klass=self.__class__,
5742                 decode_path=decode_path,
5743                 offset=offset,
5744             )
5745         if tag_only:  # pragma: no cover
5746             yield None
5747             return
5748         if evgen_mode:
5749             for _decode_path, value, tail in spec.decode_evgen(
5750                     tlv,
5751                     offset=offset,
5752                     leavemm=True,
5753                     decode_path=sub_decode_path,
5754                     ctx=ctx,
5755                     _ctx_immutable=False,
5756             ):
5757                 yield _decode_path, value, tail
5758         else:
5759             _, value, tail = next(spec.decode_evgen(
5760                 tlv,
5761                 offset=offset,
5762                 leavemm=True,
5763                 decode_path=sub_decode_path,
5764                 ctx=ctx,
5765                 _ctx_immutable=False,
5766                 _evgen_mode=False,
5767             ))
5768         obj = self.__class__(
5769             schema=self.specs,
5770             expl=self._expl,
5771             default=self.default,
5772             optional=self.optional,
5773             _decoded=(offset, 0, value.fulllen),
5774         )
5775         obj._value = (choice, value)
5776         yield decode_path, obj, tail
5777
5778     def __repr__(self):
5779         value = pp_console_row(next(self.pps()))
5780         if self.ready:
5781             value = "%s[%r]" % (value, self.value)
5782         return value
5783
5784     def pps(self, decode_path=()):
5785         yield _pp(
5786             obj=self,
5787             asn1_type_name=self.asn1_type_name,
5788             obj_name=self.__class__.__name__,
5789             decode_path=decode_path,
5790             value=self.choice if self.ready else None,
5791             optional=self.optional,
5792             default=self == self.default,
5793             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5794             expl=None if self._expl is None else tag_decode(self._expl),
5795             offset=self.offset,
5796             tlen=self.tlen,
5797             llen=self.llen,
5798             vlen=self.vlen,
5799             expl_lenindef=self.expl_lenindef,
5800             bered=self.bered,
5801         )
5802         if self.ready:
5803             yield self.value.pps(decode_path=decode_path + (self.choice,))
5804         for pp in self.pps_lenindef(decode_path):
5805             yield pp
5806
5807
5808 class PrimitiveTypes(Choice):
5809     """Predefined ``CHOICE`` for all generic primitive types
5810
5811     It could be useful for general decoding of some unspecified values:
5812
5813     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5814     OCTET STRING 3 bytes 666f6f
5815     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5816     INTEGER 1193046
5817     """
5818     __slots__ = ()
5819     schema = tuple((klass.__name__, klass()) for klass in (
5820         Boolean,
5821         Integer,
5822         BitString,
5823         OctetString,
5824         Null,
5825         ObjectIdentifier,
5826         UTF8String,
5827         NumericString,
5828         PrintableString,
5829         TeletexString,
5830         VideotexString,
5831         IA5String,
5832         UTCTime,
5833         GeneralizedTime,
5834         GraphicString,
5835         VisibleString,
5836         ISO646String,
5837         GeneralString,
5838         UniversalString,
5839         BMPString,
5840     ))
5841
5842
5843 AnyState = namedtuple(
5844     "AnyState",
5845     BasicState._fields + ("value", "defined"),
5846     **NAMEDTUPLE_KWARGS
5847 )
5848
5849
5850 class Any(Obj):
5851     """``ANY`` special type
5852
5853     >>> Any(Integer(-123))
5854     ANY INTEGER -123 (0X:7B)
5855     >>> a = Any(OctetString(b"hello world").encode())
5856     ANY 040b68656c6c6f20776f726c64
5857     >>> hexenc(bytes(a))
5858     b'0x040x0bhello world'
5859     """
5860     __slots__ = ("defined",)
5861     tag_default = tag_encode(0)
5862     asn1_type_name = "ANY"
5863
5864     def __init__(
5865             self,
5866             value=None,
5867             expl=None,
5868             optional=False,
5869             _decoded=(0, 0, 0),
5870     ):
5871         """
5872         :param value: set the value. Either any kind of pyderasn's
5873                       **ready** object, or bytes. Pay attention that
5874                       **no** validation is performed if raw binary value
5875                       is valid TLV, except just tag decoding
5876         :param bytes expl: override default tag with ``EXPLICIT`` one
5877         :param bool optional: is object ``OPTIONAL`` in sequence
5878         """
5879         super(Any, self).__init__(None, expl, None, optional, _decoded)
5880         if value is None:
5881             self._value = None
5882         else:
5883             value = self._value_sanitize(value)
5884             self._value = value
5885             if self._expl is None:
5886                 if value.__class__ == binary_type:
5887                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5888                 else:
5889                     tag_class, tag_num = value.tag_order
5890             else:
5891                 tag_class, _, tag_num = tag_decode(self._expl)
5892             self._tag_order = (tag_class, tag_num)
5893         self.defined = None
5894
5895     def _value_sanitize(self, value):
5896         if value.__class__ == binary_type:
5897             if len(value) == 0:
5898                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5899             return value
5900         if isinstance(value, self.__class__):
5901             return value._value
5902         if not isinstance(value, Obj):
5903             raise InvalidValueType((self.__class__, Obj, binary_type))
5904         return value
5905
5906     @property
5907     def ready(self):
5908         return self._value is not None
5909
5910     @property
5911     def tag_order(self):
5912         self._assert_ready()
5913         return self._tag_order
5914
5915     @property
5916     def bered(self):
5917         if self.expl_lenindef or self.lenindef:
5918             return True
5919         if self.defined is None:
5920             return False
5921         return self.defined[1].bered
5922
5923     def __getstate__(self):
5924         return AnyState(
5925             __version__,
5926             self.tag,
5927             self._tag_order,
5928             self._expl,
5929             None,
5930             self.optional,
5931             self.offset,
5932             self.llen,
5933             self.vlen,
5934             self.expl_lenindef,
5935             self.lenindef,
5936             self.ber_encoded,
5937             self._value,
5938             self.defined,
5939         )
5940
5941     def __setstate__(self, state):
5942         super(Any, self).__setstate__(state)
5943         self._value = state.value
5944         self.defined = state.defined
5945
5946     def __eq__(self, their):
5947         if their.__class__ == binary_type:
5948             if self._value.__class__ == binary_type:
5949                 return self._value == their
5950             return self._value.encode() == their
5951         if issubclass(their.__class__, Any):
5952             if self.ready and their.ready:
5953                 return bytes(self) == bytes(their)
5954             return self.ready == their.ready
5955         return False
5956
5957     def __call__(
5958             self,
5959             value=None,
5960             expl=None,
5961             optional=None,
5962     ):
5963         return self.__class__(
5964             value=value,
5965             expl=self._expl if expl is None else expl,
5966             optional=self.optional if optional is None else optional,
5967         )
5968
5969     def __bytes__(self):
5970         self._assert_ready()
5971         value = self._value
5972         if value.__class__ == binary_type:
5973             return value
5974         return self._value.encode()
5975
5976     @property
5977     def tlen(self):
5978         return 0
5979
5980     def _encode(self):
5981         self._assert_ready()
5982         value = self._value
5983         if value.__class__ == binary_type:
5984             return value
5985         return value.encode()
5986
5987     def _encode1st(self, state):
5988         self._assert_ready()
5989         value = self._value
5990         if value.__class__ == binary_type:
5991             return len(value), state
5992         return value.encode1st(state)
5993
5994     def _encode2nd(self, writer, state_iter):
5995         value = self._value
5996         if value.__class__ == binary_type:
5997             write_full(writer, value)
5998         else:
5999             value.encode2nd(writer, state_iter)
6000
6001     def _encode_cer(self, writer):
6002         self._assert_ready()
6003         value = self._value
6004         if value.__class__ == binary_type:
6005             write_full(writer, value)
6006         else:
6007             value.encode_cer(writer)
6008
6009     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6010         try:
6011             t, tlen, lv = tag_strip(tlv)
6012         except DecodeError as err:
6013             raise err.__class__(
6014                 msg=err.msg,
6015                 klass=self.__class__,
6016                 decode_path=decode_path,
6017                 offset=offset,
6018             )
6019         try:
6020             l, llen, v = len_decode(lv)
6021         except LenIndefForm as err:
6022             if not ctx.get("bered", False):
6023                 raise err.__class__(
6024                     msg=err.msg,
6025                     klass=self.__class__,
6026                     decode_path=decode_path,
6027                     offset=offset,
6028                 )
6029             llen, vlen, v = 1, 0, lv[1:]
6030             sub_offset = offset + tlen + llen
6031             chunk_i = 0
6032             while v[:EOC_LEN].tobytes() != EOC:
6033                 chunk, v = Any().decode(
6034                     v,
6035                     offset=sub_offset,
6036                     decode_path=decode_path + (str(chunk_i),),
6037                     leavemm=True,
6038                     ctx=ctx,
6039                     _ctx_immutable=False,
6040                 )
6041                 vlen += chunk.tlvlen
6042                 sub_offset += chunk.tlvlen
6043                 chunk_i += 1
6044             tlvlen = tlen + llen + vlen + EOC_LEN
6045             obj = self.__class__(
6046                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6047                 expl=self._expl,
6048                 optional=self.optional,
6049                 _decoded=(offset, 0, tlvlen),
6050             )
6051             obj.lenindef = True
6052             obj.tag = t.tobytes()
6053             yield decode_path, obj, v[EOC_LEN:]
6054             return
6055         except DecodeError as err:
6056             raise err.__class__(
6057                 msg=err.msg,
6058                 klass=self.__class__,
6059                 decode_path=decode_path,
6060                 offset=offset,
6061             )
6062         if l > len(v):
6063             raise NotEnoughData(
6064                 "encoded length is longer than data",
6065                 klass=self.__class__,
6066                 decode_path=decode_path,
6067                 offset=offset,
6068             )
6069         tlvlen = tlen + llen + l
6070         v, tail = tlv[:tlvlen], v[l:]
6071         obj = self.__class__(
6072             value=None if evgen_mode else v.tobytes(),
6073             expl=self._expl,
6074             optional=self.optional,
6075             _decoded=(offset, 0, tlvlen),
6076         )
6077         obj.tag = t.tobytes()
6078         yield decode_path, obj, tail
6079
6080     def __repr__(self):
6081         return pp_console_row(next(self.pps()))
6082
6083     def pps(self, decode_path=()):
6084         value = self._value
6085         if value is None:
6086             pass
6087         elif value.__class__ == binary_type:
6088             value = None
6089         else:
6090             value = repr(value)
6091         yield _pp(
6092             obj=self,
6093             asn1_type_name=self.asn1_type_name,
6094             obj_name=self.__class__.__name__,
6095             decode_path=decode_path,
6096             value=value,
6097             blob=self._value if self._value.__class__ == binary_type else None,
6098             optional=self.optional,
6099             default=self == self.default,
6100             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6101             expl=None if self._expl is None else tag_decode(self._expl),
6102             offset=self.offset,
6103             tlen=self.tlen,
6104             llen=self.llen,
6105             vlen=self.vlen,
6106             expl_offset=self.expl_offset if self.expled else None,
6107             expl_tlen=self.expl_tlen if self.expled else None,
6108             expl_llen=self.expl_llen if self.expled else None,
6109             expl_vlen=self.expl_vlen if self.expled else None,
6110             expl_lenindef=self.expl_lenindef,
6111             lenindef=self.lenindef,
6112             bered=self.bered,
6113         )
6114         defined_by, defined = self.defined or (None, None)
6115         if defined_by is not None:
6116             yield defined.pps(
6117                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6118             )
6119         for pp in self.pps_lenindef(decode_path):
6120             yield pp
6121
6122
6123 ########################################################################
6124 # ASN.1 constructed types
6125 ########################################################################
6126
6127 def abs_decode_path(decode_path, rel_path):
6128     """Create an absolute decode path from current and relative ones
6129
6130     :param decode_path: current decode path, starting point. Tuple of strings
6131     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6132                      If first tuple's element is "/", then treat it as
6133                      an absolute path, ignoring ``decode_path`` as
6134                      starting point. Also this tuple can contain ".."
6135                      elements, stripping the leading element from
6136                      ``decode_path``
6137
6138     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6139     ("foo", "bar", "baz", "whatever")
6140     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6141     ("foo", "whatever")
6142     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6143     ("baz", "whatever")
6144     """
6145     if rel_path[0] == "/":
6146         return rel_path[1:]
6147     if rel_path[0] == "..":
6148         return abs_decode_path(decode_path[:-1], rel_path[1:])
6149     return decode_path + rel_path
6150
6151
6152 SequenceState = namedtuple(
6153     "SequenceState",
6154     BasicState._fields + ("specs", "value",),
6155     **NAMEDTUPLE_KWARGS
6156 )
6157
6158
6159 class SequenceEncode1stMixing(object):
6160     def _encode1st(self, state):
6161         state.append(0)
6162         idx = len(state) - 1
6163         vlen = 0
6164         for v in self._values_for_encoding():
6165             l, _ = v.encode1st(state)
6166             vlen += l
6167         state[idx] = vlen
6168         return len(self.tag) + len_size(vlen) + vlen, state
6169
6170
6171 class Sequence(SequenceEncode1stMixing, Obj):
6172     """``SEQUENCE`` structure type
6173
6174     You have to make specification of sequence::
6175
6176         class Extension(Sequence):
6177             schema = (
6178                 ("extnID", ObjectIdentifier()),
6179                 ("critical", Boolean(default=False)),
6180                 ("extnValue", OctetString()),
6181             )
6182
6183     Then, you can work with it as with dictionary.
6184
6185     >>> ext = Extension()
6186     >>> Extension().specs
6187     OrderedDict([
6188         ('extnID', OBJECT IDENTIFIER),
6189         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6190         ('extnValue', OCTET STRING),
6191     ])
6192     >>> ext["extnID"] = "1.2.3"
6193     Traceback (most recent call last):
6194     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6195     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6196
6197     You can determine if sequence is ready to be encoded:
6198
6199     >>> ext.ready
6200     False
6201     >>> ext.encode()
6202     Traceback (most recent call last):
6203     pyderasn.ObjNotReady: object is not ready: extnValue
6204     >>> ext["extnValue"] = OctetString(b"foobar")
6205     >>> ext.ready
6206     True
6207
6208     Value you want to assign, must have the same **type** as in
6209     corresponding specification, but it can have different tags,
6210     optional/default attributes -- they will be taken from specification
6211     automatically::
6212
6213         class TBSCertificate(Sequence):
6214             schema = (
6215                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6216             [...]
6217
6218     >>> tbs = TBSCertificate()
6219     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6220
6221     Assign ``None`` to remove value from sequence.
6222
6223     You can set values in Sequence during its initialization:
6224
6225     >>> AlgorithmIdentifier((
6226         ("algorithm", ObjectIdentifier("1.2.3")),
6227         ("parameters", Any(Null()))
6228     ))
6229     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6230
6231     You can determine if value exists/set in the sequence and take its value:
6232
6233     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6234     (True, True, False)
6235     >>> ext["extnID"]
6236     OBJECT IDENTIFIER 1.2.3
6237
6238     But pay attention that if value has default, then it won't be (not
6239     in) in the sequence (because ``DEFAULT`` must not be encoded in
6240     DER), but you can read its value:
6241
6242     >>> "critical" in ext, ext["critical"]
6243     (False, BOOLEAN False)
6244     >>> ext["critical"] = Boolean(True)
6245     >>> "critical" in ext, ext["critical"]
6246     (True, BOOLEAN True)
6247
6248     All defaulted values are always optional.
6249
6250     .. _allow_default_values_ctx:
6251
6252     DER prohibits default value encoding and will raise an error if
6253     default value is unexpectedly met during decode.
6254     If :ref:`bered <bered_ctx>` context option is set, then no error
6255     will be raised, but ``bered`` attribute set. You can disable strict
6256     defaulted values existence validation by setting
6257     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6258
6259     All values with DEFAULT specified are decoded atomically in
6260     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6261     SEQUENCE, then it will be yielded as a single element, not
6262     disassembled. That is required for DEFAULT existence check.
6263
6264     Two sequences are equal if they have equal specification (schema),
6265     implicit/explicit tagging and the same values.
6266     """
6267     __slots__ = ("specs",)
6268     tag_default = tag_encode(form=TagFormConstructed, num=16)
6269     asn1_type_name = "SEQUENCE"
6270
6271     def __init__(
6272             self,
6273             value=None,
6274             schema=None,
6275             impl=None,
6276             expl=None,
6277             default=None,
6278             optional=False,
6279             _decoded=(0, 0, 0),
6280     ):
6281         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6282         if schema is None:
6283             schema = getattr(self, "schema", ())
6284         self.specs = (
6285             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6286         )
6287         self._value = {}
6288         if value is not None:
6289             if issubclass(value.__class__, Sequence):
6290                 self._value = value._value
6291             elif hasattr(value, "__iter__"):
6292                 for seq_key, seq_value in value:
6293                     self[seq_key] = seq_value
6294             else:
6295                 raise InvalidValueType((Sequence,))
6296         if default is not None:
6297             if not issubclass(default.__class__, Sequence):
6298                 raise InvalidValueType((Sequence,))
6299             default_value = default._value
6300             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6301             default_obj.specs = self.specs
6302             default_obj._value = default_value
6303             self.default = default_obj
6304             if value is None:
6305                 self._value = copy(default_obj._value)
6306
6307     @property
6308     def ready(self):
6309         for name, spec in iteritems(self.specs):
6310             value = self._value.get(name)
6311             if value is None:
6312                 if spec.optional:
6313                     continue
6314                 return False
6315             if not value.ready:
6316                 return False
6317         return True
6318
6319     @property
6320     def bered(self):
6321         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6322             return True
6323         return any(value.bered for value in itervalues(self._value))
6324
6325     def __getstate__(self):
6326         return SequenceState(
6327             __version__,
6328             self.tag,
6329             self._tag_order,
6330             self._expl,
6331             self.default,
6332             self.optional,
6333             self.offset,
6334             self.llen,
6335             self.vlen,
6336             self.expl_lenindef,
6337             self.lenindef,
6338             self.ber_encoded,
6339             self.specs,
6340             {k: copy(v) for k, v in iteritems(self._value)},
6341         )
6342
6343     def __setstate__(self, state):
6344         super(Sequence, self).__setstate__(state)
6345         self.specs = state.specs
6346         self._value = state.value
6347
6348     def __eq__(self, their):
6349         if not isinstance(their, self.__class__):
6350             return False
6351         return (
6352             self.specs == their.specs and
6353             self.tag == their.tag and
6354             self._expl == their._expl and
6355             self._value == their._value
6356         )
6357
6358     def __call__(
6359             self,
6360             value=None,
6361             impl=None,
6362             expl=None,
6363             default=None,
6364             optional=None,
6365     ):
6366         return self.__class__(
6367             value=value,
6368             schema=self.specs,
6369             impl=self.tag if impl is None else impl,
6370             expl=self._expl if expl is None else expl,
6371             default=self.default if default is None else default,
6372             optional=self.optional if optional is None else optional,
6373         )
6374
6375     def __contains__(self, key):
6376         return key in self._value
6377
6378     def __setitem__(self, key, value):
6379         spec = self.specs.get(key)
6380         if spec is None:
6381             raise ObjUnknown(key)
6382         if value is None:
6383             self._value.pop(key, None)
6384             return
6385         if not isinstance(value, spec.__class__):
6386             raise InvalidValueType((spec.__class__,))
6387         value = spec(value=value)
6388         if spec.default is not None and value == spec.default:
6389             self._value.pop(key, None)
6390             return
6391         self._value[key] = value
6392
6393     def __getitem__(self, key):
6394         value = self._value.get(key)
6395         if value is not None:
6396             return value
6397         spec = self.specs.get(key)
6398         if spec is None:
6399             raise ObjUnknown(key)
6400         if spec.default is not None:
6401             return spec.default
6402         return None
6403
6404     def _values_for_encoding(self):
6405         for name, spec in iteritems(self.specs):
6406             value = self._value.get(name)
6407             if value is None:
6408                 if spec.optional:
6409                     continue
6410                 raise ObjNotReady(name)
6411             yield value
6412
6413     def _encode(self):
6414         v = b"".join(v.encode() for v in self._values_for_encoding())
6415         return b"".join((self.tag, len_encode(len(v)), v))
6416
6417     def _encode2nd(self, writer, state_iter):
6418         write_full(writer, self.tag + len_encode(next(state_iter)))
6419         for v in self._values_for_encoding():
6420             v.encode2nd(writer, state_iter)
6421
6422     def _encode_cer(self, writer):
6423         write_full(writer, self.tag + LENINDEF)
6424         for v in self._values_for_encoding():
6425             v.encode_cer(writer)
6426         write_full(writer, EOC)
6427
6428     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6429         try:
6430             t, tlen, lv = tag_strip(tlv)
6431         except DecodeError as err:
6432             raise err.__class__(
6433                 msg=err.msg,
6434                 klass=self.__class__,
6435                 decode_path=decode_path,
6436                 offset=offset,
6437             )
6438         if t != self.tag:
6439             raise TagMismatch(
6440                 klass=self.__class__,
6441                 decode_path=decode_path,
6442                 offset=offset,
6443             )
6444         if tag_only:  # pragma: no cover
6445             yield None
6446             return
6447         lenindef = False
6448         ctx_bered = ctx.get("bered", False)
6449         try:
6450             l, llen, v = len_decode(lv)
6451         except LenIndefForm as err:
6452             if not ctx_bered:
6453                 raise err.__class__(
6454                     msg=err.msg,
6455                     klass=self.__class__,
6456                     decode_path=decode_path,
6457                     offset=offset,
6458                 )
6459             l, llen, v = 0, 1, lv[1:]
6460             lenindef = True
6461         except DecodeError as err:
6462             raise err.__class__(
6463                 msg=err.msg,
6464                 klass=self.__class__,
6465                 decode_path=decode_path,
6466                 offset=offset,
6467             )
6468         if l > len(v):
6469             raise NotEnoughData(
6470                 "encoded length is longer than data",
6471                 klass=self.__class__,
6472                 decode_path=decode_path,
6473                 offset=offset,
6474             )
6475         if not lenindef:
6476             v, tail = v[:l], v[l:]
6477         vlen = 0
6478         sub_offset = offset + tlen + llen
6479         values = {}
6480         ber_encoded = False
6481         ctx_allow_default_values = ctx.get("allow_default_values", False)
6482         for name, spec in iteritems(self.specs):
6483             if spec.optional and (
6484                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6485                     len(v) == 0
6486             ):
6487                 continue
6488             spec_defaulted = spec.default is not None
6489             sub_decode_path = decode_path + (name,)
6490             try:
6491                 if evgen_mode and not spec_defaulted:
6492                     for _decode_path, value, v_tail in spec.decode_evgen(
6493                             v,
6494                             sub_offset,
6495                             leavemm=True,
6496                             decode_path=sub_decode_path,
6497                             ctx=ctx,
6498                             _ctx_immutable=False,
6499                     ):
6500                         yield _decode_path, value, v_tail
6501                 else:
6502                     _, value, v_tail = next(spec.decode_evgen(
6503                         v,
6504                         sub_offset,
6505                         leavemm=True,
6506                         decode_path=sub_decode_path,
6507                         ctx=ctx,
6508                         _ctx_immutable=False,
6509                         _evgen_mode=False,
6510                     ))
6511             except TagMismatch as err:
6512                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6513                     continue
6514                 raise
6515
6516             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6517             if not evgen_mode and defined is not None:
6518                 defined_by, defined_spec = defined
6519                 if issubclass(value.__class__, SequenceOf):
6520                     for i, _value in enumerate(value):
6521                         sub_sub_decode_path = sub_decode_path + (
6522                             str(i),
6523                             DecodePathDefBy(defined_by),
6524                         )
6525                         defined_value, defined_tail = defined_spec.decode(
6526                             memoryview(bytes(_value)),
6527                             sub_offset + (
6528                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6529                                 if value.expled else (value.tlen + value.llen)
6530                             ),
6531                             leavemm=True,
6532                             decode_path=sub_sub_decode_path,
6533                             ctx=ctx,
6534                             _ctx_immutable=False,
6535                         )
6536                         if len(defined_tail) > 0:
6537                             raise DecodeError(
6538                                 "remaining data",
6539                                 klass=self.__class__,
6540                                 decode_path=sub_sub_decode_path,
6541                                 offset=offset,
6542                             )
6543                         _value.defined = (defined_by, defined_value)
6544                 else:
6545                     defined_value, defined_tail = defined_spec.decode(
6546                         memoryview(bytes(value)),
6547                         sub_offset + (
6548                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6549                             if value.expled else (value.tlen + value.llen)
6550                         ),
6551                         leavemm=True,
6552                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6553                         ctx=ctx,
6554                         _ctx_immutable=False,
6555                     )
6556                     if len(defined_tail) > 0:
6557                         raise DecodeError(
6558                             "remaining data",
6559                             klass=self.__class__,
6560                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6561                             offset=offset,
6562                         )
6563                     value.defined = (defined_by, defined_value)
6564
6565             value_len = value.fulllen
6566             vlen += value_len
6567             sub_offset += value_len
6568             v = v_tail
6569             if spec_defaulted:
6570                 if evgen_mode:
6571                     yield sub_decode_path, value, v_tail
6572                 if value == spec.default:
6573                     if ctx_bered or ctx_allow_default_values:
6574                         ber_encoded = True
6575                     else:
6576                         raise DecodeError(
6577                             "DEFAULT value met",
6578                             klass=self.__class__,
6579                             decode_path=sub_decode_path,
6580                             offset=sub_offset,
6581                         )
6582             if not evgen_mode:
6583                 values[name] = value
6584                 spec_defines = getattr(spec, "defines", ())
6585                 if len(spec_defines) == 0:
6586                     defines_by_path = ctx.get("defines_by_path", ())
6587                     if len(defines_by_path) > 0:
6588                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6589                 if spec_defines is not None and len(spec_defines) > 0:
6590                     for rel_path, schema in spec_defines:
6591                         defined = schema.get(value, None)
6592                         if defined is not None:
6593                             ctx.setdefault("_defines", []).append((
6594                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6595                                 (value, defined),
6596                             ))
6597         if lenindef:
6598             if v[:EOC_LEN].tobytes() != EOC:
6599                 raise DecodeError(
6600                     "no EOC",
6601                     klass=self.__class__,
6602                     decode_path=decode_path,
6603                     offset=offset,
6604                 )
6605             tail = v[EOC_LEN:]
6606             vlen += EOC_LEN
6607         elif len(v) > 0:
6608             raise DecodeError(
6609                 "remaining data",
6610                 klass=self.__class__,
6611                 decode_path=decode_path,
6612                 offset=offset,
6613             )
6614         obj = self.__class__(
6615             schema=self.specs,
6616             impl=self.tag,
6617             expl=self._expl,
6618             default=self.default,
6619             optional=self.optional,
6620             _decoded=(offset, llen, vlen),
6621         )
6622         obj._value = values
6623         obj.lenindef = lenindef
6624         obj.ber_encoded = ber_encoded
6625         yield decode_path, obj, tail
6626
6627     def __repr__(self):
6628         value = pp_console_row(next(self.pps()))
6629         cols = []
6630         for name in self.specs:
6631             _value = self._value.get(name)
6632             if _value is None:
6633                 continue
6634             cols.append("%s: %s" % (name, repr(_value)))
6635         return "%s[%s]" % (value, "; ".join(cols))
6636
6637     def pps(self, decode_path=()):
6638         yield _pp(
6639             obj=self,
6640             asn1_type_name=self.asn1_type_name,
6641             obj_name=self.__class__.__name__,
6642             decode_path=decode_path,
6643             optional=self.optional,
6644             default=self == self.default,
6645             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6646             expl=None if self._expl is None else tag_decode(self._expl),
6647             offset=self.offset,
6648             tlen=self.tlen,
6649             llen=self.llen,
6650             vlen=self.vlen,
6651             expl_offset=self.expl_offset if self.expled else None,
6652             expl_tlen=self.expl_tlen if self.expled else None,
6653             expl_llen=self.expl_llen if self.expled else None,
6654             expl_vlen=self.expl_vlen if self.expled else None,
6655             expl_lenindef=self.expl_lenindef,
6656             lenindef=self.lenindef,
6657             ber_encoded=self.ber_encoded,
6658             bered=self.bered,
6659         )
6660         for name in self.specs:
6661             value = self._value.get(name)
6662             if value is None:
6663                 continue
6664             yield value.pps(decode_path=decode_path + (name,))
6665         for pp in self.pps_lenindef(decode_path):
6666             yield pp
6667
6668
6669 class Set(Sequence, SequenceEncode1stMixing):
6670     """``SET`` structure type
6671
6672     Its usage is identical to :py:class:`pyderasn.Sequence`.
6673
6674     .. _allow_unordered_set_ctx:
6675
6676     DER prohibits unordered values encoding and will raise an error
6677     during decode. If :ref:`bered <bered_ctx>` context option is set,
6678     then no error will occur. Also you can disable strict values
6679     ordering check by setting ``"allow_unordered_set": True``
6680     :ref:`context <ctx>` option.
6681     """
6682     __slots__ = ()
6683     tag_default = tag_encode(form=TagFormConstructed, num=17)
6684     asn1_type_name = "SET"
6685
6686     def _values_for_encoding(self):
6687         return sorted(
6688             super(Set, self)._values_for_encoding(),
6689             key=attrgetter("tag_order"),
6690         )
6691
6692     def _encode_cer(self, writer):
6693         write_full(writer, self.tag + LENINDEF)
6694         for v in sorted(
6695                 super(Set, self)._values_for_encoding(),
6696                 key=attrgetter("tag_order_cer"),
6697         ):
6698             v.encode_cer(writer)
6699         write_full(writer, EOC)
6700
6701     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6702         try:
6703             t, tlen, lv = tag_strip(tlv)
6704         except DecodeError as err:
6705             raise err.__class__(
6706                 msg=err.msg,
6707                 klass=self.__class__,
6708                 decode_path=decode_path,
6709                 offset=offset,
6710             )
6711         if t != self.tag:
6712             raise TagMismatch(
6713                 klass=self.__class__,
6714                 decode_path=decode_path,
6715                 offset=offset,
6716             )
6717         if tag_only:
6718             yield None
6719             return
6720         lenindef = False
6721         ctx_bered = ctx.get("bered", False)
6722         try:
6723             l, llen, v = len_decode(lv)
6724         except LenIndefForm as err:
6725             if not ctx_bered:
6726                 raise err.__class__(
6727                     msg=err.msg,
6728                     klass=self.__class__,
6729                     decode_path=decode_path,
6730                     offset=offset,
6731                 )
6732             l, llen, v = 0, 1, lv[1:]
6733             lenindef = True
6734         except DecodeError as err:
6735             raise err.__class__(
6736                 msg=err.msg,
6737                 klass=self.__class__,
6738                 decode_path=decode_path,
6739                 offset=offset,
6740             )
6741         if l > len(v):
6742             raise NotEnoughData(
6743                 "encoded length is longer than data",
6744                 klass=self.__class__,
6745                 offset=offset,
6746             )
6747         if not lenindef:
6748             v, tail = v[:l], v[l:]
6749         vlen = 0
6750         sub_offset = offset + tlen + llen
6751         values = {}
6752         ber_encoded = False
6753         ctx_allow_default_values = ctx.get("allow_default_values", False)
6754         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6755         tag_order_prev = (0, 0)
6756         _specs_items = copy(self.specs)
6757
6758         while len(v) > 0:
6759             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6760                 break
6761             for name, spec in iteritems(_specs_items):
6762                 sub_decode_path = decode_path + (name,)
6763                 try:
6764                     spec.decode(
6765                         v,
6766                         sub_offset,
6767                         leavemm=True,
6768                         decode_path=sub_decode_path,
6769                         ctx=ctx,
6770                         tag_only=True,
6771                         _ctx_immutable=False,
6772                     )
6773                 except TagMismatch:
6774                     continue
6775                 break
6776             else:
6777                 raise TagMismatch(
6778                     klass=self.__class__,
6779                     decode_path=decode_path,
6780                     offset=offset,
6781                 )
6782             spec_defaulted = spec.default is not None
6783             if evgen_mode and not spec_defaulted:
6784                 for _decode_path, value, v_tail in spec.decode_evgen(
6785                         v,
6786                         sub_offset,
6787                         leavemm=True,
6788                         decode_path=sub_decode_path,
6789                         ctx=ctx,
6790                         _ctx_immutable=False,
6791                 ):
6792                     yield _decode_path, value, v_tail
6793             else:
6794                 _, value, v_tail = next(spec.decode_evgen(
6795                     v,
6796                     sub_offset,
6797                     leavemm=True,
6798                     decode_path=sub_decode_path,
6799                     ctx=ctx,
6800                     _ctx_immutable=False,
6801                     _evgen_mode=False,
6802                 ))
6803             value_tag_order = value.tag_order
6804             value_len = value.fulllen
6805             if tag_order_prev >= value_tag_order:
6806                 if ctx_bered or ctx_allow_unordered_set:
6807                     ber_encoded = True
6808                 else:
6809                     raise DecodeError(
6810                         "unordered " + self.asn1_type_name,
6811                         klass=self.__class__,
6812                         decode_path=sub_decode_path,
6813                         offset=sub_offset,
6814                     )
6815             if spec_defaulted:
6816                 if evgen_mode:
6817                     yield sub_decode_path, value, v_tail
6818                 if value != spec.default:
6819                     pass
6820                 elif ctx_bered or ctx_allow_default_values:
6821                     ber_encoded = True
6822                 else:
6823                     raise DecodeError(
6824                         "DEFAULT value met",
6825                         klass=self.__class__,
6826                         decode_path=sub_decode_path,
6827                         offset=sub_offset,
6828                     )
6829             values[name] = value
6830             del _specs_items[name]
6831             tag_order_prev = value_tag_order
6832             sub_offset += value_len
6833             vlen += value_len
6834             v = v_tail
6835
6836         obj = self.__class__(
6837             schema=self.specs,
6838             impl=self.tag,
6839             expl=self._expl,
6840             default=self.default,
6841             optional=self.optional,
6842             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6843         )
6844         if lenindef:
6845             if v[:EOC_LEN].tobytes() != EOC:
6846                 raise DecodeError(
6847                     "no EOC",
6848                     klass=self.__class__,
6849                     decode_path=decode_path,
6850                     offset=offset,
6851                 )
6852             tail = v[EOC_LEN:]
6853             obj.lenindef = True
6854         for name, spec in iteritems(self.specs):
6855             if name not in values and not spec.optional:
6856                 raise DecodeError(
6857                     "%s value is not ready" % name,
6858                     klass=self.__class__,
6859                     decode_path=decode_path,
6860                     offset=offset,
6861                 )
6862         if not evgen_mode:
6863             obj._value = values
6864         obj.ber_encoded = ber_encoded
6865         yield decode_path, obj, tail
6866
6867
6868 SequenceOfState = namedtuple(
6869     "SequenceOfState",
6870     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6871     **NAMEDTUPLE_KWARGS
6872 )
6873
6874
6875 class SequenceOf(SequenceEncode1stMixing, Obj):
6876     """``SEQUENCE OF`` sequence type
6877
6878     For that kind of type you must specify the object it will carry on
6879     (bounds are for example here, not required)::
6880
6881         class Ints(SequenceOf):
6882             schema = Integer()
6883             bounds = (0, 2)
6884
6885     >>> ints = Ints()
6886     >>> ints.append(Integer(123))
6887     >>> ints.append(Integer(234))
6888     >>> ints
6889     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6890     >>> [int(i) for i in ints]
6891     [123, 234]
6892     >>> ints.append(Integer(345))
6893     Traceback (most recent call last):
6894     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6895     >>> ints[1]
6896     INTEGER 234
6897     >>> ints[1] = Integer(345)
6898     >>> ints
6899     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6900
6901     You can initialize sequence with preinitialized values:
6902
6903     >>> ints = Ints([Integer(123), Integer(234)])
6904
6905     Also you can use iterator as a value:
6906
6907     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6908
6909     And it won't be iterated until encoding process. Pay attention that
6910     bounds and required schema checks are done only during the encoding
6911     process in that case! After encode was called, then value is zeroed
6912     back to empty list and you have to set it again. That mode is useful
6913     mainly with CER encoding mode, where all objects from the iterable
6914     will be streamed to the buffer, without copying all of them to
6915     memory first.
6916     """
6917     __slots__ = ("spec", "_bound_min", "_bound_max")
6918     tag_default = tag_encode(form=TagFormConstructed, num=16)
6919     asn1_type_name = "SEQUENCE OF"
6920
6921     def __init__(
6922             self,
6923             value=None,
6924             schema=None,
6925             bounds=None,
6926             impl=None,
6927             expl=None,
6928             default=None,
6929             optional=False,
6930             _decoded=(0, 0, 0),
6931     ):
6932         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6933         if schema is None:
6934             schema = getattr(self, "schema", None)
6935         if schema is None:
6936             raise ValueError("schema must be specified")
6937         self.spec = schema
6938         self._bound_min, self._bound_max = getattr(
6939             self,
6940             "bounds",
6941             (0, float("+inf")),
6942         ) if bounds is None else bounds
6943         self._value = []
6944         if value is not None:
6945             self._value = self._value_sanitize(value)
6946         if default is not None:
6947             default_value = self._value_sanitize(default)
6948             default_obj = self.__class__(
6949                 schema=schema,
6950                 impl=self.tag,
6951                 expl=self._expl,
6952             )
6953             default_obj._value = default_value
6954             self.default = default_obj
6955             if value is None:
6956                 self._value = copy(default_obj._value)
6957
6958     def _value_sanitize(self, value):
6959         iterator = False
6960         if issubclass(value.__class__, SequenceOf):
6961             value = value._value
6962         elif hasattr(value, NEXT_ATTR_NAME):
6963             iterator = True
6964         elif hasattr(value, "__iter__"):
6965             value = list(value)
6966         else:
6967             raise InvalidValueType((self.__class__, iter, "iterator"))
6968         if not iterator:
6969             if not self._bound_min <= len(value) <= self._bound_max:
6970                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6971             class_expected = self.spec.__class__
6972             for v in value:
6973                 if not isinstance(v, class_expected):
6974                     raise InvalidValueType((class_expected,))
6975         return value
6976
6977     @property
6978     def ready(self):
6979         if hasattr(self._value, NEXT_ATTR_NAME):
6980             return True
6981         if self._bound_min > 0 and len(self._value) == 0:
6982             return False
6983         return all(v.ready for v in self._value)
6984
6985     @property
6986     def bered(self):
6987         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6988             return True
6989         return any(v.bered for v in self._value)
6990
6991     def __getstate__(self):
6992         if hasattr(self._value, NEXT_ATTR_NAME):
6993             raise ValueError("can not pickle SequenceOf with iterator")
6994         return SequenceOfState(
6995             __version__,
6996             self.tag,
6997             self._tag_order,
6998             self._expl,
6999             self.default,
7000             self.optional,
7001             self.offset,
7002             self.llen,
7003             self.vlen,
7004             self.expl_lenindef,
7005             self.lenindef,
7006             self.ber_encoded,
7007             self.spec,
7008             [copy(v) for v in self._value],
7009             self._bound_min,
7010             self._bound_max,
7011         )
7012
7013     def __setstate__(self, state):
7014         super(SequenceOf, self).__setstate__(state)
7015         self.spec = state.spec
7016         self._value = state.value
7017         self._bound_min = state.bound_min
7018         self._bound_max = state.bound_max
7019
7020     def __eq__(self, their):
7021         if isinstance(their, self.__class__):
7022             return (
7023                 self.spec == their.spec and
7024                 self.tag == their.tag and
7025                 self._expl == their._expl and
7026                 self._value == their._value
7027             )
7028         if hasattr(their, "__iter__"):
7029             return self._value == list(their)
7030         return False
7031
7032     def __call__(
7033             self,
7034             value=None,
7035             bounds=None,
7036             impl=None,
7037             expl=None,
7038             default=None,
7039             optional=None,
7040     ):
7041         return self.__class__(
7042             value=value,
7043             schema=self.spec,
7044             bounds=(
7045                 (self._bound_min, self._bound_max)
7046                 if bounds is None else bounds
7047             ),
7048             impl=self.tag if impl is None else impl,
7049             expl=self._expl if expl is None else expl,
7050             default=self.default if default is None else default,
7051             optional=self.optional if optional is None else optional,
7052         )
7053
7054     def __contains__(self, key):
7055         return key in self._value
7056
7057     def append(self, value):
7058         if not isinstance(value, self.spec.__class__):
7059             raise InvalidValueType((self.spec.__class__,))
7060         if len(self._value) + 1 > self._bound_max:
7061             raise BoundsError(
7062                 self._bound_min,
7063                 len(self._value) + 1,
7064                 self._bound_max,
7065             )
7066         self._value.append(value)
7067
7068     def __iter__(self):
7069         return iter(self._value)
7070
7071     def __len__(self):
7072         return len(self._value)
7073
7074     def __setitem__(self, key, value):
7075         if not isinstance(value, self.spec.__class__):
7076             raise InvalidValueType((self.spec.__class__,))
7077         self._value[key] = self.spec(value=value)
7078
7079     def __getitem__(self, key):
7080         return self._value[key]
7081
7082     def _values_for_encoding(self):
7083         return iter(self._value)
7084
7085     def _encode(self):
7086         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7087         if iterator:
7088             values = []
7089             values_append = values.append
7090             class_expected = self.spec.__class__
7091             values_for_encoding = self._values_for_encoding()
7092             self._value = []
7093             for v in values_for_encoding:
7094                 if not isinstance(v, class_expected):
7095                     raise InvalidValueType((class_expected,))
7096                 values_append(v.encode())
7097             if not self._bound_min <= len(values) <= self._bound_max:
7098                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7099             value = b"".join(values)
7100         else:
7101             value = b"".join(v.encode() for v in self._values_for_encoding())
7102         return b"".join((self.tag, len_encode(len(value)), value))
7103
7104     def _encode1st(self, state):
7105         state = super(SequenceOf, self)._encode1st(state)
7106         if hasattr(self._value, NEXT_ATTR_NAME):
7107             self._value = []
7108         return state
7109
7110     def _encode2nd(self, writer, state_iter):
7111         write_full(writer, self.tag + len_encode(next(state_iter)))
7112         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7113         if iterator:
7114             values_count = 0
7115             class_expected = self.spec.__class__
7116             values_for_encoding = self._values_for_encoding()
7117             self._value = []
7118             for v in values_for_encoding:
7119                 if not isinstance(v, class_expected):
7120                     raise InvalidValueType((class_expected,))
7121                 v.encode2nd(writer, state_iter)
7122                 values_count += 1
7123             if not self._bound_min <= values_count <= self._bound_max:
7124                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7125         else:
7126             for v in self._values_for_encoding():
7127                 v.encode2nd(writer, state_iter)
7128
7129     def _encode_cer(self, writer):
7130         write_full(writer, self.tag + LENINDEF)
7131         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7132         if iterator:
7133             class_expected = self.spec.__class__
7134             values_count = 0
7135             values_for_encoding = self._values_for_encoding()
7136             self._value = []
7137             for v in values_for_encoding:
7138                 if not isinstance(v, class_expected):
7139                     raise InvalidValueType((class_expected,))
7140                 v.encode_cer(writer)
7141                 values_count += 1
7142             if not self._bound_min <= values_count <= self._bound_max:
7143                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7144         else:
7145             for v in self._values_for_encoding():
7146                 v.encode_cer(writer)
7147         write_full(writer, EOC)
7148
7149     def _decode(
7150             self,
7151             tlv,
7152             offset,
7153             decode_path,
7154             ctx,
7155             tag_only,
7156             evgen_mode,
7157             ordering_check=False,
7158     ):
7159         try:
7160             t, tlen, lv = tag_strip(tlv)
7161         except DecodeError as err:
7162             raise err.__class__(
7163                 msg=err.msg,
7164                 klass=self.__class__,
7165                 decode_path=decode_path,
7166                 offset=offset,
7167             )
7168         if t != self.tag:
7169             raise TagMismatch(
7170                 klass=self.__class__,
7171                 decode_path=decode_path,
7172                 offset=offset,
7173             )
7174         if tag_only:
7175             yield None
7176             return
7177         lenindef = False
7178         ctx_bered = ctx.get("bered", False)
7179         try:
7180             l, llen, v = len_decode(lv)
7181         except LenIndefForm as err:
7182             if not ctx_bered:
7183                 raise err.__class__(
7184                     msg=err.msg,
7185                     klass=self.__class__,
7186                     decode_path=decode_path,
7187                     offset=offset,
7188                 )
7189             l, llen, v = 0, 1, lv[1:]
7190             lenindef = True
7191         except DecodeError as err:
7192             raise err.__class__(
7193                 msg=err.msg,
7194                 klass=self.__class__,
7195                 decode_path=decode_path,
7196                 offset=offset,
7197             )
7198         if l > len(v):
7199             raise NotEnoughData(
7200                 "encoded length is longer than data",
7201                 klass=self.__class__,
7202                 decode_path=decode_path,
7203                 offset=offset,
7204             )
7205         if not lenindef:
7206             v, tail = v[:l], v[l:]
7207         vlen = 0
7208         sub_offset = offset + tlen + llen
7209         _value = []
7210         _value_count = 0
7211         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7212         value_prev = memoryview(v[:0])
7213         ber_encoded = False
7214         spec = self.spec
7215         while len(v) > 0:
7216             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7217                 break
7218             sub_decode_path = decode_path + (str(_value_count),)
7219             if evgen_mode:
7220                 for _decode_path, value, v_tail in spec.decode_evgen(
7221                         v,
7222                         sub_offset,
7223                         leavemm=True,
7224                         decode_path=sub_decode_path,
7225                         ctx=ctx,
7226                         _ctx_immutable=False,
7227                 ):
7228                     yield _decode_path, value, v_tail
7229             else:
7230                 _, value, v_tail = next(spec.decode_evgen(
7231                     v,
7232                     sub_offset,
7233                     leavemm=True,
7234                     decode_path=sub_decode_path,
7235                     ctx=ctx,
7236                     _ctx_immutable=False,
7237                     _evgen_mode=False,
7238                 ))
7239             value_len = value.fulllen
7240             if ordering_check:
7241                 if value_prev.tobytes() > v[:value_len].tobytes():
7242                     if ctx_bered or ctx_allow_unordered_set:
7243                         ber_encoded = True
7244                     else:
7245                         raise DecodeError(
7246                             "unordered " + self.asn1_type_name,
7247                             klass=self.__class__,
7248                             decode_path=sub_decode_path,
7249                             offset=sub_offset,
7250                         )
7251                 value_prev = v[:value_len]
7252             _value_count += 1
7253             if not evgen_mode:
7254                 _value.append(value)
7255             sub_offset += value_len
7256             vlen += value_len
7257             v = v_tail
7258         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7259             raise DecodeError(
7260                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7261                 klass=self.__class__,
7262                 decode_path=decode_path,
7263                 offset=offset,
7264             )
7265         try:
7266             obj = self.__class__(
7267                 value=None if evgen_mode else _value,
7268                 schema=spec,
7269                 bounds=(self._bound_min, self._bound_max),
7270                 impl=self.tag,
7271                 expl=self._expl,
7272                 default=self.default,
7273                 optional=self.optional,
7274                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7275             )
7276         except BoundsError as err:
7277             raise DecodeError(
7278                 msg=str(err),
7279                 klass=self.__class__,
7280                 decode_path=decode_path,
7281                 offset=offset,
7282             )
7283         if lenindef:
7284             if v[:EOC_LEN].tobytes() != EOC:
7285                 raise DecodeError(
7286                     "no EOC",
7287                     klass=self.__class__,
7288                     decode_path=decode_path,
7289                     offset=offset,
7290                 )
7291             obj.lenindef = True
7292             tail = v[EOC_LEN:]
7293         obj.ber_encoded = ber_encoded
7294         yield decode_path, obj, tail
7295
7296     def __repr__(self):
7297         return "%s[%s]" % (
7298             pp_console_row(next(self.pps())),
7299             ", ".join(repr(v) for v in self._value),
7300         )
7301
7302     def pps(self, decode_path=()):
7303         yield _pp(
7304             obj=self,
7305             asn1_type_name=self.asn1_type_name,
7306             obj_name=self.__class__.__name__,
7307             decode_path=decode_path,
7308             optional=self.optional,
7309             default=self == self.default,
7310             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7311             expl=None if self._expl is None else tag_decode(self._expl),
7312             offset=self.offset,
7313             tlen=self.tlen,
7314             llen=self.llen,
7315             vlen=self.vlen,
7316             expl_offset=self.expl_offset if self.expled else None,
7317             expl_tlen=self.expl_tlen if self.expled else None,
7318             expl_llen=self.expl_llen if self.expled else None,
7319             expl_vlen=self.expl_vlen if self.expled else None,
7320             expl_lenindef=self.expl_lenindef,
7321             lenindef=self.lenindef,
7322             ber_encoded=self.ber_encoded,
7323             bered=self.bered,
7324         )
7325         for i, value in enumerate(self._value):
7326             yield value.pps(decode_path=decode_path + (str(i),))
7327         for pp in self.pps_lenindef(decode_path):
7328             yield pp
7329
7330
7331 class SetOf(SequenceOf):
7332     """``SET OF`` sequence type
7333
7334     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7335     """
7336     __slots__ = ()
7337     tag_default = tag_encode(form=TagFormConstructed, num=17)
7338     asn1_type_name = "SET OF"
7339
7340     def _value_sanitize(self, value):
7341         value = super(SetOf, self)._value_sanitize(value)
7342         if hasattr(value, NEXT_ATTR_NAME):
7343             raise ValueError(
7344                 "SetOf does not support iterator values, as no sense in them"
7345             )
7346         return value
7347
7348     def _encode(self):
7349         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7350         return b"".join((self.tag, len_encode(len(v)), v))
7351
7352     def _encode2nd(self, writer, state_iter):
7353         write_full(writer, self.tag + len_encode(next(state_iter)))
7354         values = []
7355         for v in self._values_for_encoding():
7356             buf = BytesIO()
7357             v.encode2nd(buf.write, state_iter)
7358             values.append(buf.getvalue())
7359         values.sort()
7360         for v in values:
7361             write_full(writer, v)
7362
7363     def _encode_cer(self, writer):
7364         write_full(writer, self.tag + LENINDEF)
7365         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7366             write_full(writer, v)
7367         write_full(writer, EOC)
7368
7369     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7370         return super(SetOf, self)._decode(
7371             tlv,
7372             offset,
7373             decode_path,
7374             ctx,
7375             tag_only,
7376             evgen_mode,
7377             ordering_check=True,
7378         )
7379
7380
7381 def obj_by_path(pypath):  # pragma: no cover
7382     """Import object specified as string Python path
7383
7384     Modules must be separated from classes/functions with ``:``.
7385
7386     >>> obj_by_path("foo.bar:Baz")
7387     <class 'foo.bar.Baz'>
7388     >>> obj_by_path("foo.bar:Baz.boo")
7389     <classmethod 'foo.bar.Baz.boo'>
7390     """
7391     mod, objs = pypath.rsplit(":", 1)
7392     from importlib import import_module
7393     obj = import_module(mod)
7394     for obj_name in objs.split("."):
7395         obj = getattr(obj, obj_name)
7396     return obj
7397
7398
7399 def generic_decoder():  # pragma: no cover
7400     # All of this below is a big hack with self references
7401     choice = PrimitiveTypes()
7402     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7403     choice.specs["SetOf"] = SetOf(schema=choice)
7404     for i in six_xrange(31):
7405         choice.specs["SequenceOf%d" % i] = SequenceOf(
7406             schema=choice,
7407             expl=tag_ctxc(i),
7408         )
7409     choice.specs["Any"] = Any()
7410
7411     # Class name equals to type name, to omit it from output
7412     class SEQUENCEOF(SequenceOf):
7413         __slots__ = ()
7414         schema = choice
7415
7416     def pprint_any(
7417             obj,
7418             oid_maps=(),
7419             with_colours=False,
7420             with_decode_path=False,
7421             decode_path_only=(),
7422             decode_path=(),
7423     ):
7424         def _pprint_pps(pps):
7425             for pp in pps:
7426                 if hasattr(pp, "_fields"):
7427                     if (
7428                             decode_path_only != () and
7429                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7430                     ):
7431                         continue
7432                     if pp.asn1_type_name == Choice.asn1_type_name:
7433                         continue
7434                     pp_kwargs = pp._asdict()
7435                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7436                     pp = _pp(**pp_kwargs)
7437                     yield pp_console_row(
7438                         pp,
7439                         oid_maps=oid_maps,
7440                         with_offsets=True,
7441                         with_blob=False,
7442                         with_colours=with_colours,
7443                         with_decode_path=with_decode_path,
7444                         decode_path_len_decrease=len(decode_path_only),
7445                     )
7446                     for row in pp_console_blob(
7447                             pp,
7448                             decode_path_len_decrease=len(decode_path_only),
7449                     ):
7450                         yield row
7451                 else:
7452                     for row in _pprint_pps(pp):
7453                         yield row
7454         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7455     return SEQUENCEOF(), pprint_any
7456
7457
7458 def ascii_visualize(ba):
7459     """Output only ASCII printable characters, like in hexdump -C
7460
7461     Example output for given binary string (right part)::
7462
7463         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7464                                                            ^^^^^^^^^^^^^^^^
7465     """
7466     return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7467
7468
7469 def hexdump(raw):
7470     """Generate ``hexdump -C`` like output
7471
7472     Rendered example::
7473
7474         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7475         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7476
7477     Result of that function is a generator of lines, where each line is
7478     a list of columns::
7479
7480         [
7481             [...],
7482             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7483                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7484                           " |i..?.....G...k.j|"]
7485             [...],
7486         ]
7487     """
7488     hexed = hexenc(raw).upper()
7489     addr, cols = 0, ["%08x " % 0]
7490     for i in six_xrange(0, len(hexed), 2):
7491         if i != 0 and i // 2 % 8 == 0:
7492             cols[-1] += " "
7493         if i != 0 and i // 2 % 16 == 0:
7494             cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7495             yield cols
7496             addr += 16
7497             cols = ["%08x " % addr]
7498         cols.append(" " + hexed[i:i + 2])
7499     if len(cols) > 0:
7500         cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7501         yield cols
7502
7503
7504 def browse(raw, obj, oid_maps=()):
7505     """Interactive browser
7506
7507     :param bytes raw: binary data you decoded
7508     :param obj: decoded :py:class:`pyderasn.Obj`
7509     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7510                      Its human readable form is printed when OID is met
7511
7512     .. note:: `urwid <http://urwid.org/>`__ dependency required
7513
7514     This browser is an interactive terminal application for browsing
7515     structures of your decoded ASN.1 objects. You can quit it with **q**
7516     key. It consists of three windows:
7517
7518     :tree:
7519      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7520      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7521      **Left** key goes to constructed element above. **Plus**/**Minus**
7522      keys collapse/uncollapse constructed elements. **Space** toggles it
7523     :info:
7524      window with various information about element. You can scroll it
7525      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7526     :hexdump:
7527      window with raw data hexdump and highlighted current element's
7528      contents. It automatically focuses on element's data. You can
7529      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7530      speed) keys. If element has explicit tag, then it also will be
7531      highlighted with different colour
7532
7533     Window's header contains current decode path and progress bars with
7534     position in *info* and *hexdump* windows.
7535
7536     If you press **d**, then current element will be saved in the
7537     current directory under its decode path name (adding ".0", ".1", etc
7538     suffix if such file already exists). **D** will save it with explicit tag.
7539
7540     You can also invoke it with ``--browse`` command line argument.
7541     """
7542     from copy import deepcopy
7543     from os.path import exists as path_exists
7544     import urwid
7545
7546     class TW(urwid.TreeWidget):
7547         def __init__(self, state, *args, **kwargs):
7548             self.state = state
7549             self.scrolled = {"info": False, "hexdump": False}
7550             super(TW, self).__init__(*args, **kwargs)
7551
7552         def _get_pp(self):
7553             pp = self.get_node().get_value()
7554             constructed = len(pp) > 1
7555             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7556
7557         def _state_update(self):
7558             pp, _ = self._get_pp()
7559             self.state["decode_path"].set_text(
7560                 ":".join(str(p) for p in pp.decode_path)
7561             )
7562             lines = deepcopy(self.state["hexed"])
7563
7564             def attr_set(i, attr):
7565                 line = lines[i // 16]
7566                 idx = 1 + (i - 16 * (i // 16))
7567                 line[idx] = (attr, line[idx])
7568
7569             if pp.expl_offset is not None:
7570                 for i in six_xrange(
7571                         pp.expl_offset,
7572                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7573                 ):
7574                     attr_set(i, "select-expl")
7575             for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7576                 attr_set(i, "select-value")
7577             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7578             self.state["hexdump"].set_focus(pp.offset // 16)
7579             self.state["hexdump"].set_focus_valign("middle")
7580             self.state["hexdump_bar"].set_completion(
7581                 (100 * pp.offset // 16) //
7582                 len(self.state["hexdump"]._body.positions())
7583             )
7584
7585             lines = [
7586                 [("header", "Name: "), pp.obj_name],
7587                 [("header", "Type: "), pp.asn1_type_name],
7588                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7589                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7590                     pp.tlen, pp.llen, pp.vlen,
7591                 )],
7592                 [("header", "TLVlen: "), "%d" % sum((
7593                     pp.tlen, pp.llen, pp.vlen,
7594                 ))],
7595                 [("header", "Slice: "), "[%d:%d]" % (
7596                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7597                 )],
7598             ]
7599             if pp.lenindef:
7600                 lines.append([("warning", "LENINDEF")])
7601             if pp.ber_encoded:
7602                 lines.append([("warning", "BER encoded")])
7603             if pp.bered:
7604                 lines.append([("warning", "BERed")])
7605             if pp.expl is not None:
7606                 lines.append([("header", "EXPLICIT")])
7607                 klass, _, num = pp.expl
7608                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7609                 if pp.expl_offset is not None:
7610                     lines.append(["  Offset: %d" % pp.expl_offset])
7611                     lines.append(["  [TLV]len: %d/%d/%d" % (
7612                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7613                     )])
7614                     lines.append(["  TLVlen: %d" % sum((
7615                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7616                     ))])
7617                     lines.append(["  Slice: [%d:%d]" % (
7618                         pp.expl_offset,
7619                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7620                     )])
7621             if pp.impl is not None:
7622                 klass, _, num = pp.impl
7623                 lines.append([
7624                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7625                 ])
7626             if pp.optional:
7627                 lines.append(["OPTIONAL"])
7628             if pp.default:
7629                 lines.append(["DEFAULT"])
7630             if len(pp.decode_path) > 0:
7631                 ent = pp.decode_path[-1]
7632                 if isinstance(ent, DecodePathDefBy):
7633                     lines.append([""])
7634                     value = str(ent.defined_by)
7635                     oid_name = find_oid_name(
7636                         ent.defined_by.asn1_type_name, oid_maps, value,
7637                     )
7638                     lines.append([("header", "DEFINED BY: "), "%s" % (
7639                         value if oid_name is None
7640                         else "%s (%s)" % (oid_name, value)
7641                     )])
7642             lines.append([""])
7643             if pp.value is not None:
7644                 lines.append([("header", "Value: "), pp.value])
7645                 if (
7646                         len(oid_maps) > 0 and
7647                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7648                 ):
7649                     for oid_map in oid_maps:
7650                         oid_name = oid_map.get(pp.value)
7651                         if oid_name is not None:
7652                             lines.append([("header", "Human: "), oid_name])
7653                             break
7654                 if pp.asn1_type_name == Integer.asn1_type_name:
7655                     lines.append([
7656                         ("header", "Decimal: "), "%d" % int(pp.obj),
7657                     ])
7658                     lines.append([
7659                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7660                     ])
7661             if pp.blob.__class__ == binary_type:
7662                 blob = hexenc(pp.blob).upper()
7663                 for i in six_xrange(0, len(blob), 32):
7664                     lines.append([colonize_hex(blob[i:i + 32])])
7665             elif pp.blob.__class__ == tuple:
7666                 lines.append([", ".join(pp.blob)])
7667             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7668             self.state["info_bar"].set_completion(0)
7669
7670         def selectable(self):
7671             if self.state["widget_current"] != self:
7672                 self.state["widget_current"] = self
7673                 self.scrolled["info"] = False
7674                 self.scrolled["hexdump"] = False
7675                 self._state_update()
7676             return super(TW, self).selectable()
7677
7678         def get_display_text(self):
7679             pp, constructed = self._get_pp()
7680             style = "constructed" if constructed else ""
7681             if len(pp.decode_path) == 0:
7682                 return (style, pp.obj_name)
7683             if pp.asn1_type_name == "EOC":
7684                 return ("eoc", "EOC")
7685             ent = pp.decode_path[-1]
7686             if isinstance(ent, DecodePathDefBy):
7687                 value = str(ent.defined_by)
7688                 oid_name = find_oid_name(
7689                     ent.defined_by.asn1_type_name, oid_maps, value,
7690                 )
7691                 return ("defby", "DEFBY:" + (
7692                     value if oid_name is None else oid_name
7693                 ))
7694             return (style, ent)
7695
7696         def _scroll(self, what, step):
7697             self.state[what]._invalidate()
7698             pos = self.state[what].focus_position
7699             if not self.scrolled[what]:
7700                 self.scrolled[what] = True
7701                 pos -= 2
7702             pos = max(0, pos + step)
7703             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7704             self.state[what].set_focus(pos)
7705             self.state[what].set_focus_valign("top")
7706             self.state[what + "_bar"].set_completion(
7707                 (100 * pos) // len(self.state[what]._body.positions())
7708             )
7709
7710         def keypress(self, size, key):
7711             if key == "q":
7712                 raise urwid.ExitMainLoop()
7713
7714             if key == " ":
7715                 self.expanded = not self.expanded
7716                 self.update_expanded_icon()
7717                 return None
7718
7719             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7720             if key in hexdump_steps:
7721                 self._scroll("hexdump", hexdump_steps[key])
7722                 return None
7723
7724             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7725             if key in info_steps:
7726                 self._scroll("info", info_steps[key])
7727                 return None
7728
7729             if key in ("d", "D"):
7730                 pp, _ = self._get_pp()
7731                 dp = ":".join(str(p) for p in pp.decode_path)
7732                 dp = dp.replace(" ", "_")
7733                 if dp == "":
7734                     dp = "root"
7735                 if key == "d" or pp.expl_offset is None:
7736                     data = self.state["raw"][pp.offset:(
7737                         pp.offset + pp.tlen + pp.llen + pp.vlen
7738                     )]
7739                 else:
7740                     data = self.state["raw"][pp.expl_offset:(
7741                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7742                     )]
7743                 ctr = 0
7744
7745                 def duplicate_path(dp, ctr):
7746                     if ctr == 0:
7747                         return dp
7748                     return "%s.%d" % (dp, ctr)
7749
7750                 while True:
7751                     if not path_exists(duplicate_path(dp, ctr)):
7752                         break
7753                     ctr += 1
7754                 dp = duplicate_path(dp, ctr)
7755                 with open(dp, "wb") as fd:
7756                     fd.write(data)
7757                 self.state["decode_path"].set_text(
7758                     ("warning", "Saved to: " + dp)
7759                 )
7760                 return None
7761             return super(TW, self).keypress(size, key)
7762
7763     class PN(urwid.ParentNode):
7764         def __init__(self, state, value, *args, **kwargs):
7765             self.state = state
7766             if not hasattr(value, "_fields"):
7767                 value = list(value)
7768             super(PN, self).__init__(value, *args, **kwargs)
7769
7770         def load_widget(self):
7771             return TW(self.state, self)
7772
7773         def load_child_keys(self):
7774             value = self.get_value()
7775             if hasattr(value, "_fields"):
7776                 return []
7777             return range(len(value[1:]))
7778
7779         def load_child_node(self, key):
7780             return PN(
7781                 self.state,
7782                 self.get_value()[key + 1],
7783                 parent=self,
7784                 key=key,
7785                 depth=self.get_depth() + 1,
7786             )
7787
7788     class LabeledPG(urwid.ProgressBar):
7789         def __init__(self, label, *args, **kwargs):
7790             self.label = label
7791             super(LabeledPG, self).__init__(*args, **kwargs)
7792
7793         def get_text(self):
7794             return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7795
7796     WinHexdump = urwid.ListBox([urwid.Text("")])
7797     WinInfo = urwid.ListBox([urwid.Text("")])
7798     WinDecodePath = urwid.Text("", "center")
7799     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7800     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7801     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7802         {
7803             "raw": raw,
7804             "hexed": list(hexdump(raw)),
7805             "widget_current": None,
7806             "info": WinInfo,
7807             "info_bar": WinInfoBar,
7808             "hexdump": WinHexdump,
7809             "hexdump_bar": WinHexdumpBar,
7810             "decode_path": WinDecodePath,
7811         },
7812         list(obj.pps()),
7813     )))
7814     help_text = " ".join((
7815         "q:quit",
7816         "space:(un)collapse",
7817         "(pg)up/down/home/end:nav",
7818         "jkJK:hexdump hlHL:info",
7819         "dD:dump",
7820     ))
7821     urwid.MainLoop(
7822         urwid.Frame(
7823             urwid.Columns([
7824                 ("weight", 1, WinTree),
7825                 ("weight", 2, urwid.Pile([
7826                     urwid.LineBox(WinInfo),
7827                     urwid.LineBox(WinHexdump),
7828                 ])),
7829             ]),
7830             header=urwid.Columns([
7831                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7832                 ("weight", 1, WinInfoBar),
7833                 ("weight", 1, WinHexdumpBar),
7834             ]),
7835             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7836         ),
7837         [
7838             ("header", "bold", ""),
7839             ("constructed", "bold", ""),
7840             ("help", "light magenta", ""),
7841             ("warning", "light red", ""),
7842             ("defby", "light red", ""),
7843             ("eoc", "dark red", ""),
7844             ("select-value", "light green", ""),
7845             ("select-expl", "light red", ""),
7846             ("pg-normal", "", "light blue"),
7847             ("pg-complete", "black", "yellow"),
7848         ],
7849     ).run()
7850
7851
7852 def main():  # pragma: no cover
7853     import argparse
7854     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7855     parser.add_argument(
7856         "--skip",
7857         type=int,
7858         default=0,
7859         help="Skip that number of bytes from the beginning",
7860     )
7861     parser.add_argument(
7862         "--oids",
7863         help="Python paths to dictionary with OIDs, comma separated",
7864     )
7865     parser.add_argument(
7866         "--schema",
7867         help="Python path to schema definition to use",
7868     )
7869     parser.add_argument(
7870         "--defines-by-path",
7871         help="Python path to decoder's defines_by_path",
7872     )
7873     parser.add_argument(
7874         "--nobered",
7875         action="store_true",
7876         help="Disallow BER encoding",
7877     )
7878     parser.add_argument(
7879         "--print-decode-path",
7880         action="store_true",
7881         help="Print decode paths",
7882     )
7883     parser.add_argument(
7884         "--decode-path-only",
7885         help="Print only specified decode path",
7886     )
7887     parser.add_argument(
7888         "--allow-expl-oob",
7889         action="store_true",
7890         help="Allow explicit tag out-of-bound",
7891     )
7892     parser.add_argument(
7893         "--evgen",
7894         action="store_true",
7895         help="Turn on event generation mode",
7896     )
7897     parser.add_argument(
7898         "--browse",
7899         action="store_true",
7900         help="Start ASN.1 browser",
7901     )
7902     parser.add_argument(
7903         "RAWFile",
7904         type=argparse.FileType("rb"),
7905         help="Path to BER/CER/DER file you want to decode",
7906     )
7907     args = parser.parse_args()
7908     try:
7909         raw = file_mmaped(args.RAWFile)[args.skip:]
7910     except:
7911         args.RAWFile.seek(args.skip)
7912         raw = memoryview(args.RAWFile.read())
7913         args.RAWFile.close()
7914     oid_maps = (
7915         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7916         if args.oids else ()
7917     )
7918     from functools import partial
7919     if args.schema:
7920         schema = obj_by_path(args.schema)
7921         pprinter = partial(pprint, big_blobs=True)
7922     else:
7923         schema, pprinter = generic_decoder()
7924     ctx = {
7925         "bered": not args.nobered,
7926         "allow_expl_oob": args.allow_expl_oob,
7927     }
7928     if args.defines_by_path is not None:
7929         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7930     if args.browse:
7931         obj, _ = schema().decode(raw, ctx=ctx)
7932         browse(raw, obj, oid_maps)
7933         from sys import exit as sys_exit
7934         sys_exit(0)
7935     from os import environ
7936     pprinter = partial(
7937         pprinter,
7938         oid_maps=oid_maps,
7939         with_colours=environ.get("NO_COLOR") is None,
7940         with_decode_path=args.print_decode_path,
7941         decode_path_only=(
7942             () if args.decode_path_only is None else
7943             tuple(args.decode_path_only.split(":"))
7944         ),
7945     )
7946     if args.evgen:
7947         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7948             print(pprinter(obj, decode_path=decode_path))
7949     else:
7950         obj, tail = schema().decode(raw, ctx=ctx)
7951         print(pprinter(obj))
7952     if tail != b"":
7953         print("\nTrailing data: %s" % hexenc(tail))
7954
7955
7956 if __name__ == "__main__":
7957     from pyderasn import *
7958     main()