]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
DER 2pass encoding
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. _mmap:
669
670 mmap-ed file
671 ____________
672
673 POSIX compliant systems have ``mmap`` syscall, giving ability to work
674 the memory mapped file. You can deal with the file like it was an
675 ordinary binary string, allowing you not to load it to the memory first.
676 Also you can use them as an input for OCTET STRING, taking no Python
677 memory for their storage.
678
679 There is convenient :py:func:`pyderasn.file_mmaped` function that
680 creates read-only memoryview on the file contents::
681
682     with open("huge", "rb") as fd:
683         raw = file_mmaped(fd)
684         obj = Something.decode(raw)
685
686 .. warning::
687
688    mmap-ed files in Python2.7 does not implement buffer protocol, so
689    memoryview won't work on them.
690
691 .. warning::
692
693    mmap maps the **whole** file. So it plays no role if you seek-ed it
694    before. Take the slice of the resulting memoryview with required
695    offset instead.
696
697 .. note::
698
699    If you use ZFS as underlying storage, then pay attention that
700    currently most platforms does not deal good with ZFS ARC and ordinary
701    page cache used for mmaps. It can take twice the necessary size in
702    the memory: both in page cache and ZFS ARC.
703
704 CER encoding
705 ____________
706
707 We can parse any kind of data now, but how can we produce files
708 streamingly, without storing their encoded representation in memory?
709 SEQUENCE by default encodes in memory all its values, joins them in huge
710 binary string, just to know the exact size of SEQUENCE's value for
711 encoding it in TLV. DER requires you to know all exact sizes of the
712 objects.
713
714 You can use CER encoding mode, that slightly differs from the DER, but
715 does not require exact sizes knowledge, allowing streaming encoding
716 directly to some writer/buffer. Just use
717 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
718 encoded data will flow::
719
720     opener = io.open if PY2 else open
721     with opener("result", "wb") as fd:
722         obj.encode_cer(fd.write)
723
724 ::
725
726     buf = io.BytesIO()
727     obj.encode_cer(buf.write)
728
729 If you do not want to create in-memory buffer every time, then you can
730 use :py:func:`pyderasn.encode_cer` function::
731
732     data = encode_cer(obj)
733
734 Remember that CER is **not valid** DER in most cases, so you **have to**
735 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
736 decoding. Also currently there is **no** validation that provided CER is
737 valid one -- you are sure that it has only valid BER encoding.
738
739 .. warning::
740
741    SET OF values can not be streamingly encoded, because they are
742    required to be sorted byte-by-byte. Big SET OF values still will take
743    much memory. Use neither SET nor SET OF values, as modern ASN.1
744    also recommends too.
745
746 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
747 OCTET STRINGs! They will be streamingly copied from underlying file to
748 the buffer using 1 KB chunks.
749
750 Some structures require that some of the elements have to be forcefully
751 DER encoded. For example ``SignedData`` CMS requires you to encode
752 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
753 encode everything else in BER. You can tell any of the structures to be
754 forcefully encoded in DER during CER encoding, by specifying
755 ``der_forced=True`` attribute::
756
757     class Certificate(Sequence):
758         schema = (...)
759         der_forced = True
760
761     class SignedAttributes(SetOf):
762         schema = Attribute()
763         bounds = (1, 32)
764         der_forced = True
765
766 agg_octet_string
767 ________________
768
769 In most cases, huge quantity of binary data is stored as OCTET STRING.
770 CER encoding splits it on 1 KB chunks. BER allows splitting on various
771 levels of chunks inclusion::
772
773     SOME STRING[CONSTRUCTED]
774         OCTET STRING[CONSTRUCTED]
775             OCTET STRING[PRIMITIVE]
776                 DATA CHUNK
777             OCTET STRING[PRIMITIVE]
778                 DATA CHUNK
779             OCTET STRING[PRIMITIVE]
780                 DATA CHUNK
781         OCTET STRING[PRIMITIVE]
782             DATA CHUNK
783         OCTET STRING[CONSTRUCTED]
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[CONSTRUCTED]
789             OCTET STRING[CONSTRUCTED]
790                 OCTET STRING[PRIMITIVE]
791                     DATA CHUNK
792
793 You can not just take the offset and some ``.vlen`` of the STRING and
794 treat it as the payload. If you decode it without
795 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
796 and ``bytes()`` will give the whole payload contents.
797
798 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
799 small memory footprint. There is convenient
800 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
801 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
802 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
803 SHA512 digest of its ``eContent``::
804
805     fd = open("data.p7m", "rb")
806     raw = file_mmaped(fd)
807     ctx = {"bered": True}
808     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
809         if decode_path == ("content",):
810             content = obj
811             break
812     else:
813         raise ValueError("no content found")
814     hasher_state = sha512()
815     def hasher(data):
816         hasher_state.update(data)
817         return len(data)
818     evgens = SignedData().decode_evgen(
819         raw[content.offset:],
820         offset=content.offset,
821         ctx=ctx,
822     )
823     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
824     fd.close()
825     digest = hasher_state.digest()
826
827 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
828 copy the payload (without BER/CER encoding interleaved overhead) in it.
829 Virtually it won't take memory more than for keeping small structures
830 and 1 KB binary chunks.
831
832 .. _seqof-iterators:
833
834 SEQUENCE OF iterators
835 _____________________
836
837 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
838 classes. The only difference with providing the full list of objects, is
839 that type and bounds checking is done during encoding process. Also
840 sequence's value will be emptied after encoding, forcing you to set its
841 value again.
842
843 This is very useful when you have to create some huge objects, like
844 CRLs, with thousands and millions of entities inside. You can write the
845 generator taking necessary data from the database and giving the
846 ``RevokedCertificate`` objects. Only binary representation of that
847 objects will take memory during DER encoding.
848
849 2-pass DER encoding
850 -------------------
851
852 There is ability to do 2-pass encoding to DER, writing results directly
853 to specified writer (buffer, file, whatever). It could be 1.5+ times
854 slower than ordinary encoding, but it takes little memory for 1st pass
855 state storing. For example, 1st pass state for CACert.org's CRL with
856 ~416K of certificate entries takes nearly 3.5 MB of memory.
857 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
858 nearly 0.5 KB of memory.
859
860 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
861 iterators <seqof-iterators>` and write directly to opened file, then
862 there is very small memory footprint.
863
864 1st pass traverses through all the objects of the structure and returns
865 the size of DER encoded structure, together with 1st pass state object.
866 That state contains precalculated lengths for various objects inside the
867 structure.
868
869 ::
870
871     fulllen, state = obj.encode1st()
872
873 2nd pass takes the writer and 1st pass state. It traverses through all
874 the objects again, but writes their encoded representation to the writer.
875
876 ::
877
878     opener = io.open if PY2 else open
879     with opener("result", "wb") as fd:
880         obj.encode2nd(fd.write, iter(state))
881
882 .. warning::
883
884    You **MUST NOT** use 1st pass state if anything is changed in the
885    objects. It is intended to be used immediately after 1st pass is
886    done!
887
888 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
889 have to reinitialize the values after the 1st pass. And you **have to**
890 be sure that the iterator gives exactly the same values as previously.
891 Yes, you have to run your iterator twice -- because this is two pass
892 encoding mode.
893
894 If you want to encode to the memory, then you can use convenient
895 :py:func:`pyderasn.encode2pass` helper.
896
897 Base Obj
898 --------
899 .. autoclass:: pyderasn.Obj
900    :members:
901
902 Primitive types
903 ---------------
904
905 Boolean
906 _______
907 .. autoclass:: pyderasn.Boolean
908    :members: __init__
909
910 Integer
911 _______
912 .. autoclass:: pyderasn.Integer
913    :members: __init__, named
914
915 BitString
916 _________
917 .. autoclass:: pyderasn.BitString
918    :members: __init__, bit_len, named
919
920 OctetString
921 ___________
922 .. autoclass:: pyderasn.OctetString
923    :members: __init__
924
925 Null
926 ____
927 .. autoclass:: pyderasn.Null
928    :members: __init__
929
930 ObjectIdentifier
931 ________________
932 .. autoclass:: pyderasn.ObjectIdentifier
933    :members: __init__
934
935 Enumerated
936 __________
937 .. autoclass:: pyderasn.Enumerated
938
939 CommonString
940 ____________
941 .. autoclass:: pyderasn.CommonString
942
943 NumericString
944 _____________
945 .. autoclass:: pyderasn.NumericString
946
947 PrintableString
948 _______________
949 .. autoclass:: pyderasn.PrintableString
950    :members: __init__, allow_asterisk, allow_ampersand
951
952 UTCTime
953 _______
954 .. autoclass:: pyderasn.UTCTime
955    :members: __init__, todatetime
956
957 GeneralizedTime
958 _______________
959 .. autoclass:: pyderasn.GeneralizedTime
960    :members: __init__, todatetime
961
962 Special types
963 -------------
964
965 Choice
966 ______
967 .. autoclass:: pyderasn.Choice
968    :members: __init__, choice, value
969
970 PrimitiveTypes
971 ______________
972 .. autoclass:: PrimitiveTypes
973
974 Any
975 ___
976 .. autoclass:: pyderasn.Any
977    :members: __init__
978
979 Constructed types
980 -----------------
981
982 Sequence
983 ________
984 .. autoclass:: pyderasn.Sequence
985    :members: __init__
986
987 Set
988 ___
989 .. autoclass:: pyderasn.Set
990    :members: __init__
991
992 SequenceOf
993 __________
994 .. autoclass:: pyderasn.SequenceOf
995    :members: __init__
996
997 SetOf
998 _____
999 .. autoclass:: pyderasn.SetOf
1000    :members: __init__
1001
1002 Various
1003 -------
1004
1005 .. autofunction:: pyderasn.abs_decode_path
1006 .. autofunction:: pyderasn.agg_octet_string
1007 .. autofunction:: pyderasn.colonize_hex
1008 .. autofunction:: pyderasn.encode2pass
1009 .. autofunction:: pyderasn.encode_cer
1010 .. autofunction:: pyderasn.file_mmaped
1011 .. autofunction:: pyderasn.hexenc
1012 .. autofunction:: pyderasn.hexdec
1013 .. autofunction:: pyderasn.tag_encode
1014 .. autofunction:: pyderasn.tag_decode
1015 .. autofunction:: pyderasn.tag_ctxp
1016 .. autofunction:: pyderasn.tag_ctxc
1017 .. autoclass:: pyderasn.DecodeError
1018    :members: __init__
1019 .. autoclass:: pyderasn.NotEnoughData
1020 .. autoclass:: pyderasn.ExceedingData
1021 .. autoclass:: pyderasn.LenIndefForm
1022 .. autoclass:: pyderasn.TagMismatch
1023 .. autoclass:: pyderasn.InvalidLength
1024 .. autoclass:: pyderasn.InvalidOID
1025 .. autoclass:: pyderasn.ObjUnknown
1026 .. autoclass:: pyderasn.ObjNotReady
1027 .. autoclass:: pyderasn.InvalidValueType
1028 .. autoclass:: pyderasn.BoundsError
1029
1030 .. _cmdline:
1031
1032 Command-line usage
1033 ------------------
1034
1035 You can decode DER/BER files using command line abilities::
1036
1037     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1038
1039 If there is no schema for your file, then you can try parsing it without,
1040 but of course IMPLICIT tags will often make it impossible. But result is
1041 good enough for the certificate above::
1042
1043     $ python -m pyderasn path/to/file
1044         0   [1,3,1604]  . >: SEQUENCE OF
1045         4   [1,3,1453]  . . >: SEQUENCE OF
1046         8   [0,0,   5]  . . . . >: [0] ANY
1047                         . . . . . A0:03:02:01:02
1048        13   [1,1,   3]  . . . . >: INTEGER 61595
1049        18   [1,1,  13]  . . . . >: SEQUENCE OF
1050        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1051        31   [1,1,   0]  . . . . . . >: NULL
1052        33   [1,3, 274]  . . . . >: SEQUENCE OF
1053        37   [1,1,  11]  . . . . . . >: SET OF
1054        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1055        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1056        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1057     [...]
1058      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1059      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1060      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1061                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1062                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1063                         . . . . . . . . . 61:2E:63:6F:6D:2F
1064      1461   [1,1,  13]  . . >: SEQUENCE OF
1065      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1066      1474   [1,1,   0]  . . . . >: NULL
1067      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1068                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1069                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1070     [...]
1071
1072 Human readable OIDs
1073 ___________________
1074
1075 If you have got dictionaries with ObjectIdentifiers, like example one
1076 from ``tests/test_crts.py``::
1077
1078     stroid2name = {
1079         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1080         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1081         [...]
1082         "2.5.4.10": "id-at-organizationName",
1083         "2.5.4.11": "id-at-organizationalUnitName",
1084     }
1085
1086 then you can pass it to pretty printer to see human readable OIDs::
1087
1088     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1089     [...]
1090        37   [1,1,  11]  . . . . . . >: SET OF
1091        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1092        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1093        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1094        50   [1,1,  18]  . . . . . . >: SET OF
1095        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1096        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1097        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1098        70   [1,1,  18]  . . . . . . >: SET OF
1099        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1100        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1101        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1102     [...]
1103
1104 Decode paths
1105 ____________
1106
1107 Each decoded element has so-called decode path: sequence of structure
1108 names it is passing during the decode process. Each element has its own
1109 unique path inside the whole ASN.1 tree. You can print it out with
1110 ``--print-decode-path`` option::
1111
1112     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1113        0    [1,3,1604]  Certificate SEQUENCE []
1114        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1115       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1116       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1117       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1118       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1119       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1120                          . . . . 05:00
1121       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1122       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1123       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1124       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1125       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1126       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1127                          . . . . . . . 13:02:45:53
1128       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1129     [...]
1130
1131 Now you can print only the specified tree, for example signature algorithm::
1132
1133     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1134       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1135       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1136       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1137                          . . 05:00
1138 """
1139
1140 from array import array
1141 from codecs import getdecoder
1142 from codecs import getencoder
1143 from collections import namedtuple
1144 from collections import OrderedDict
1145 from copy import copy
1146 from datetime import datetime
1147 from datetime import timedelta
1148 from io import BytesIO
1149 from math import ceil
1150 from mmap import mmap
1151 from mmap import PROT_READ
1152 from operator import attrgetter
1153 from string import ascii_letters
1154 from string import digits
1155 from sys import maxsize as sys_maxsize
1156 from sys import version_info
1157 from unicodedata import category as unicat
1158
1159 from six import add_metaclass
1160 from six import binary_type
1161 from six import byte2int
1162 from six import indexbytes
1163 from six import int2byte
1164 from six import integer_types
1165 from six import iterbytes
1166 from six import iteritems
1167 from six import itervalues
1168 from six import PY2
1169 from six import string_types
1170 from six import text_type
1171 from six import unichr as six_unichr
1172 from six.moves import xrange as six_xrange
1173
1174
1175 try:
1176     from termcolor import colored
1177 except ImportError:  # pragma: no cover
1178     def colored(what, *args, **kwargs):
1179         return what
1180
1181 __version__ = "7.2"
1182
1183 __all__ = (
1184     "agg_octet_string",
1185     "Any",
1186     "BitString",
1187     "BMPString",
1188     "Boolean",
1189     "BoundsError",
1190     "Choice",
1191     "DecodeError",
1192     "DecodePathDefBy",
1193     "encode2pass",
1194     "encode_cer",
1195     "Enumerated",
1196     "ExceedingData",
1197     "file_mmaped",
1198     "GeneralizedTime",
1199     "GeneralString",
1200     "GraphicString",
1201     "hexdec",
1202     "hexenc",
1203     "IA5String",
1204     "Integer",
1205     "InvalidLength",
1206     "InvalidOID",
1207     "InvalidValueType",
1208     "ISO646String",
1209     "LenIndefForm",
1210     "NotEnoughData",
1211     "Null",
1212     "NumericString",
1213     "obj_by_path",
1214     "ObjectIdentifier",
1215     "ObjNotReady",
1216     "ObjUnknown",
1217     "OctetString",
1218     "PrimitiveTypes",
1219     "PrintableString",
1220     "Sequence",
1221     "SequenceOf",
1222     "Set",
1223     "SetOf",
1224     "T61String",
1225     "tag_ctxc",
1226     "tag_ctxp",
1227     "tag_decode",
1228     "TagClassApplication",
1229     "TagClassContext",
1230     "TagClassPrivate",
1231     "TagClassUniversal",
1232     "TagFormConstructed",
1233     "TagFormPrimitive",
1234     "TagMismatch",
1235     "TeletexString",
1236     "UniversalString",
1237     "UTCTime",
1238     "UTF8String",
1239     "VideotexString",
1240     "VisibleString",
1241 )
1242
1243 TagClassUniversal = 0
1244 TagClassApplication = 1 << 6
1245 TagClassContext = 1 << 7
1246 TagClassPrivate = 1 << 6 | 1 << 7
1247 TagFormPrimitive = 0
1248 TagFormConstructed = 1 << 5
1249 TagClassReprs = {
1250     TagClassContext: "",
1251     TagClassApplication: "APPLICATION ",
1252     TagClassPrivate: "PRIVATE ",
1253     TagClassUniversal: "UNIV ",
1254 }
1255 EOC = b"\x00\x00"
1256 EOC_LEN = len(EOC)
1257 LENINDEF = b"\x80"  # length indefinite mark
1258 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1259 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1260 SET01 = frozenset("01")
1261 DECIMALS = frozenset(digits)
1262 DECIMAL_SIGNS = ".,"
1263 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1264
1265
1266 def file_mmaped(fd):
1267     """Make mmap-ed memoryview for reading from file
1268
1269     :param fd: file object
1270     :returns: memoryview over read-only mmap-ing of the whole file
1271     """
1272     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1273
1274 def pureint(value):
1275     if not set(value) <= DECIMALS:
1276         raise ValueError("non-pure integer")
1277     return int(value)
1278
1279 def fractions2float(fractions_raw):
1280     pureint(fractions_raw)
1281     return float("0." + fractions_raw)
1282
1283
1284 def get_def_by_path(defines_by_path, sub_decode_path):
1285     """Get define by decode path
1286     """
1287     for path, define in defines_by_path:
1288         if len(path) != len(sub_decode_path):
1289             continue
1290         for p1, p2 in zip(path, sub_decode_path):
1291             if (not p1 is any) and (p1 != p2):
1292                 break
1293         else:
1294             return define
1295
1296
1297 ########################################################################
1298 # Errors
1299 ########################################################################
1300
1301 class ASN1Error(ValueError):
1302     pass
1303
1304
1305 class DecodeError(ASN1Error):
1306     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1307         """
1308         :param str msg: reason of decode failing
1309         :param klass: optional exact DecodeError inherited class (like
1310                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1311                       :py:exc:`InvalidLength`)
1312         :param decode_path: tuple of strings. It contains human
1313                             readable names of the fields through which
1314                             decoding process has passed
1315         :param int offset: binary offset where failure happened
1316         """
1317         super(DecodeError, self).__init__()
1318         self.msg = msg
1319         self.klass = klass
1320         self.decode_path = decode_path
1321         self.offset = offset
1322
1323     def __str__(self):
1324         return " ".join(
1325             c for c in (
1326                 "" if self.klass is None else self.klass.__name__,
1327                 (
1328                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1329                     if len(self.decode_path) > 0 else ""
1330                 ),
1331                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1332                 self.msg,
1333             ) if c != ""
1334         )
1335
1336     def __repr__(self):
1337         return "%s(%s)" % (self.__class__.__name__, self)
1338
1339
1340 class NotEnoughData(DecodeError):
1341     pass
1342
1343
1344 class ExceedingData(ASN1Error):
1345     def __init__(self, nbytes):
1346         super(ExceedingData, self).__init__()
1347         self.nbytes = nbytes
1348
1349     def __str__(self):
1350         return "%d trailing bytes" % self.nbytes
1351
1352     def __repr__(self):
1353         return "%s(%s)" % (self.__class__.__name__, self)
1354
1355
1356 class LenIndefForm(DecodeError):
1357     pass
1358
1359
1360 class TagMismatch(DecodeError):
1361     pass
1362
1363
1364 class InvalidLength(DecodeError):
1365     pass
1366
1367
1368 class InvalidOID(DecodeError):
1369     pass
1370
1371
1372 class ObjUnknown(ASN1Error):
1373     def __init__(self, name):
1374         super(ObjUnknown, self).__init__()
1375         self.name = name
1376
1377     def __str__(self):
1378         return "object is unknown: %s" % self.name
1379
1380     def __repr__(self):
1381         return "%s(%s)" % (self.__class__.__name__, self)
1382
1383
1384 class ObjNotReady(ASN1Error):
1385     def __init__(self, name):
1386         super(ObjNotReady, self).__init__()
1387         self.name = name
1388
1389     def __str__(self):
1390         return "object is not ready: %s" % self.name
1391
1392     def __repr__(self):
1393         return "%s(%s)" % (self.__class__.__name__, self)
1394
1395
1396 class InvalidValueType(ASN1Error):
1397     def __init__(self, expected_types):
1398         super(InvalidValueType, self).__init__()
1399         self.expected_types = expected_types
1400
1401     def __str__(self):
1402         return "invalid value type, expected: %s" % ", ".join(
1403             [repr(t) for t in self.expected_types]
1404         )
1405
1406     def __repr__(self):
1407         return "%s(%s)" % (self.__class__.__name__, self)
1408
1409
1410 class BoundsError(ASN1Error):
1411     def __init__(self, bound_min, value, bound_max):
1412         super(BoundsError, self).__init__()
1413         self.bound_min = bound_min
1414         self.value = value
1415         self.bound_max = bound_max
1416
1417     def __str__(self):
1418         return "unsatisfied bounds: %s <= %s <= %s" % (
1419             self.bound_min,
1420             self.value,
1421             self.bound_max,
1422         )
1423
1424     def __repr__(self):
1425         return "%s(%s)" % (self.__class__.__name__, self)
1426
1427
1428 ########################################################################
1429 # Basic coders
1430 ########################################################################
1431
1432 _hexdecoder = getdecoder("hex")
1433 _hexencoder = getencoder("hex")
1434
1435
1436 def hexdec(data):
1437     """Binary data to hexadecimal string convert
1438     """
1439     return _hexdecoder(data)[0]
1440
1441
1442 def hexenc(data):
1443     """Hexadecimal string to binary data convert
1444     """
1445     return _hexencoder(data)[0].decode("ascii")
1446
1447
1448 def int_bytes_len(num, byte_len=8):
1449     if num == 0:
1450         return 1
1451     return int(ceil(float(num.bit_length()) / byte_len))
1452
1453
1454 def zero_ended_encode(num):
1455     octets = bytearray(int_bytes_len(num, 7))
1456     i = len(octets) - 1
1457     octets[i] = num & 0x7F
1458     num >>= 7
1459     i -= 1
1460     while num > 0:
1461         octets[i] = 0x80 | (num & 0x7F)
1462         num >>= 7
1463         i -= 1
1464     return bytes(octets)
1465
1466
1467 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1468     """Encode tag to binary form
1469
1470     :param int num: tag's number
1471     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1472                       :py:data:`pyderasn.TagClassContext`,
1473                       :py:data:`pyderasn.TagClassApplication`,
1474                       :py:data:`pyderasn.TagClassPrivate`)
1475     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1476                      :py:data:`pyderasn.TagFormConstructed`)
1477     """
1478     if num < 31:
1479         # [XX|X|.....]
1480         return int2byte(klass | form | num)
1481     # [XX|X|11111][1.......][1.......] ... [0.......]
1482     return int2byte(klass | form | 31) + zero_ended_encode(num)
1483
1484
1485 def tag_decode(tag):
1486     """Decode tag from binary form
1487
1488     .. warning::
1489
1490        No validation is performed, assuming that it has already passed.
1491
1492     It returns tuple with three integers, as
1493     :py:func:`pyderasn.tag_encode` accepts.
1494     """
1495     first_octet = byte2int(tag)
1496     klass = first_octet & 0xC0
1497     form = first_octet & 0x20
1498     if first_octet & 0x1F < 0x1F:
1499         return (klass, form, first_octet & 0x1F)
1500     num = 0
1501     for octet in iterbytes(tag[1:]):
1502         num <<= 7
1503         num |= octet & 0x7F
1504     return (klass, form, num)
1505
1506
1507 def tag_ctxp(num):
1508     """Create CONTEXT PRIMITIVE tag
1509     """
1510     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1511
1512
1513 def tag_ctxc(num):
1514     """Create CONTEXT CONSTRUCTED tag
1515     """
1516     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1517
1518
1519 def tag_strip(data):
1520     """Take off tag from the data
1521
1522     :returns: (encoded tag, tag length, remaining data)
1523     """
1524     if len(data) == 0:
1525         raise NotEnoughData("no data at all")
1526     if byte2int(data) & 0x1F < 31:
1527         return data[:1], 1, data[1:]
1528     i = 0
1529     while True:
1530         i += 1
1531         if i == len(data):
1532             raise DecodeError("unfinished tag")
1533         if indexbytes(data, i) & 0x80 == 0:
1534             break
1535     i += 1
1536     return data[:i], i, data[i:]
1537
1538
1539 def len_encode(l):
1540     if l < 0x80:
1541         return int2byte(l)
1542     octets = bytearray(int_bytes_len(l) + 1)
1543     octets[0] = 0x80 | (len(octets) - 1)
1544     for i in six_xrange(len(octets) - 1, 0, -1):
1545         octets[i] = l & 0xFF
1546         l >>= 8
1547     return bytes(octets)
1548
1549
1550 def len_decode(data):
1551     """Decode length
1552
1553     :returns: (decoded length, length's length, remaining data)
1554     :raises LenIndefForm: if indefinite form encoding is met
1555     """
1556     if len(data) == 0:
1557         raise NotEnoughData("no data at all")
1558     first_octet = byte2int(data)
1559     if first_octet & 0x80 == 0:
1560         return first_octet, 1, data[1:]
1561     octets_num = first_octet & 0x7F
1562     if octets_num + 1 > len(data):
1563         raise NotEnoughData("encoded length is longer than data")
1564     if octets_num == 0:
1565         raise LenIndefForm()
1566     if byte2int(data[1:]) == 0:
1567         raise DecodeError("leading zeros")
1568     l = 0
1569     for v in iterbytes(data[1:1 + octets_num]):
1570         l = (l << 8) | v
1571     if l <= 127:
1572         raise DecodeError("long form instead of short one")
1573     return l, 1 + octets_num, data[1 + octets_num:]
1574
1575
1576 LEN0 = len_encode(0)
1577 LEN1 = len_encode(1)
1578 LEN1K = len_encode(1000)
1579
1580
1581 def len_size(l):
1582     """How many bytes length field will take
1583     """
1584     if l < 128:
1585         return 1
1586     if l < 256:  # 1 << 8
1587         return 2
1588     if l < 65536:  # 1 << 16
1589         return 3
1590     if l < 16777216:  # 1 << 24
1591         return 4
1592     if l < 4294967296:  # 1 << 32
1593         return 5
1594     if l < 1099511627776:  # 1 << 40
1595         return 6
1596     if l < 281474976710656:  # 1 << 48
1597         return 7
1598     if l < 72057594037927936:  # 1 << 56
1599         return 8
1600     raise OverflowError("too big length")
1601
1602
1603 def write_full(writer, data):
1604     """Fully write provided data
1605
1606     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1607
1608     BytesIO does not guarantee that the whole data will be written at
1609     once. That function write everything provided, raising an error if
1610     ``writer`` returns None.
1611     """
1612     data = memoryview(data)
1613     written = 0
1614     while written != len(data):
1615         n = writer(data[written:])
1616         if n is None:
1617             raise ValueError("can not write to buf")
1618         written += n
1619
1620
1621 # If it is 64-bit system, then use compact 64-bit array of unsigned
1622 # longs. Use an ordinary list with universal integers otherwise, that
1623 # is slower.
1624 if sys_maxsize > 2 ** 32:
1625     def state_2pass_new():
1626         return array("L")
1627 else:
1628     def state_2pass_new():
1629         return []
1630
1631
1632 ########################################################################
1633 # Base class
1634 ########################################################################
1635
1636 class AutoAddSlots(type):
1637     def __new__(cls, name, bases, _dict):
1638         _dict["__slots__"] = _dict.get("__slots__", ())
1639         return type.__new__(cls, name, bases, _dict)
1640
1641
1642 BasicState = namedtuple("BasicState", (
1643     "version",
1644     "tag",
1645     "tag_order",
1646     "expl",
1647     "default",
1648     "optional",
1649     "offset",
1650     "llen",
1651     "vlen",
1652     "expl_lenindef",
1653     "lenindef",
1654     "ber_encoded",
1655 ), **NAMEDTUPLE_KWARGS)
1656
1657
1658 @add_metaclass(AutoAddSlots)
1659 class Obj(object):
1660     """Common ASN.1 object class
1661
1662     All ASN.1 types are inherited from it. It has metaclass that
1663     automatically adds ``__slots__`` to all inherited classes.
1664     """
1665     __slots__ = (
1666         "tag",
1667         "_tag_order",
1668         "_value",
1669         "_expl",
1670         "default",
1671         "optional",
1672         "offset",
1673         "llen",
1674         "vlen",
1675         "expl_lenindef",
1676         "lenindef",
1677         "ber_encoded",
1678     )
1679
1680     def __init__(
1681             self,
1682             impl=None,
1683             expl=None,
1684             default=None,
1685             optional=False,
1686             _decoded=(0, 0, 0),
1687     ):
1688         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1689         self._expl = getattr(self, "expl", None) if expl is None else expl
1690         if self.tag != self.tag_default and self._expl is not None:
1691             raise ValueError("implicit and explicit tags can not be set simultaneously")
1692         if self.tag is None:
1693             self._tag_order = None
1694         else:
1695             tag_class, _, tag_num = tag_decode(
1696                 self.tag if self._expl is None else self._expl
1697             )
1698             self._tag_order = (tag_class, tag_num)
1699         if default is not None:
1700             optional = True
1701         self.optional = optional
1702         self.offset, self.llen, self.vlen = _decoded
1703         self.default = None
1704         self.expl_lenindef = False
1705         self.lenindef = False
1706         self.ber_encoded = False
1707
1708     @property
1709     def ready(self):  # pragma: no cover
1710         """Is object ready to be encoded?
1711         """
1712         raise NotImplementedError()
1713
1714     def _assert_ready(self):
1715         if not self.ready:
1716             raise ObjNotReady(self.__class__.__name__)
1717
1718     @property
1719     def bered(self):
1720         """Is either object or any elements inside is BER encoded?
1721         """
1722         return self.expl_lenindef or self.lenindef or self.ber_encoded
1723
1724     @property
1725     def decoded(self):
1726         """Is object decoded?
1727         """
1728         return (self.llen + self.vlen) > 0
1729
1730     def __getstate__(self):  # pragma: no cover
1731         """Used for making safe to be mutable pickleable copies
1732         """
1733         raise NotImplementedError()
1734
1735     def __setstate__(self, state):
1736         if state.version != __version__:
1737             raise ValueError("data is pickled by different PyDERASN version")
1738         self.tag = state.tag
1739         self._tag_order = state.tag_order
1740         self._expl = state.expl
1741         self.default = state.default
1742         self.optional = state.optional
1743         self.offset = state.offset
1744         self.llen = state.llen
1745         self.vlen = state.vlen
1746         self.expl_lenindef = state.expl_lenindef
1747         self.lenindef = state.lenindef
1748         self.ber_encoded = state.ber_encoded
1749
1750     @property
1751     def tag_order(self):
1752         """Tag's (class, number) used for DER/CER sorting
1753         """
1754         return self._tag_order
1755
1756     @property
1757     def tag_order_cer(self):
1758         return self.tag_order
1759
1760     @property
1761     def tlen(self):
1762         """See :ref:`decoding`
1763         """
1764         return len(self.tag)
1765
1766     @property
1767     def tlvlen(self):
1768         """See :ref:`decoding`
1769         """
1770         return self.tlen + self.llen + self.vlen
1771
1772     def __str__(self):  # pragma: no cover
1773         return self.__bytes__() if PY2 else self.__unicode__()
1774
1775     def __ne__(self, their):
1776         return not(self == their)
1777
1778     def __gt__(self, their):  # pragma: no cover
1779         return not(self < their)
1780
1781     def __le__(self, their):  # pragma: no cover
1782         return (self == their) or (self < their)
1783
1784     def __ge__(self, their):  # pragma: no cover
1785         return (self == their) or (self > their)
1786
1787     def _encode(self):  # pragma: no cover
1788         raise NotImplementedError()
1789
1790     def _encode_cer(self, writer):
1791         write_full(writer, self._encode())
1792
1793     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1794         yield NotImplemented
1795
1796     def _encode1st(self, state):
1797         raise NotImplementedError()
1798
1799     def _encode2nd(self, writer, state_iter):
1800         raise NotImplementedError()
1801
1802     def encode(self):
1803         """DER encode the structure
1804
1805         :returns: DER representation
1806         """
1807         raw = self._encode()
1808         if self._expl is None:
1809             return raw
1810         return b"".join((self._expl, len_encode(len(raw)), raw))
1811
1812     def encode1st(self, state=None):
1813         """Do the 1st pass of 2-pass encoding
1814
1815         :rtype: (int, array("L"))
1816         :returns: full length of encoded data and precalculated various
1817                   objects lengths
1818         """
1819         if state is None:
1820             state = state_2pass_new()
1821         if self._expl is None:
1822             return self._encode1st(state)
1823         state.append(0)
1824         idx = len(state) - 1
1825         vlen, _ = self._encode1st(state)
1826         state[idx] = vlen
1827         fulllen = len(self._expl) + len_size(vlen) + vlen
1828         return fulllen, state
1829
1830     def encode2nd(self, writer, state_iter):
1831         """Do the 2nd pass of 2-pass encoding
1832
1833         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1834         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1835         """
1836         if self._expl is None:
1837             self._encode2nd(writer, state_iter)
1838         else:
1839             write_full(writer, self._expl + len_encode(next(state_iter)))
1840             self._encode2nd(writer, state_iter)
1841
1842     def encode_cer(self, writer):
1843         """CER encode the structure to specified writer
1844
1845         :param writer: must comply with ``io.RawIOBase.write``
1846                        behaviour. It takes slice to be written and
1847                        returns number of bytes processed. If it returns
1848                        None, then exception will be raised
1849         """
1850         if self._expl is not None:
1851             write_full(writer, self._expl + LENINDEF)
1852         if getattr(self, "der_forced", False):
1853             write_full(writer, self._encode())
1854         else:
1855             self._encode_cer(writer)
1856         if self._expl is not None:
1857             write_full(writer, EOC)
1858
1859     def hexencode(self):
1860         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1861         """
1862         return hexenc(self.encode())
1863
1864     def decode(
1865             self,
1866             data,
1867             offset=0,
1868             leavemm=False,
1869             decode_path=(),
1870             ctx=None,
1871             tag_only=False,
1872             _ctx_immutable=True,
1873     ):
1874         """Decode the data
1875
1876         :param data: either binary or memoryview
1877         :param int offset: initial data's offset
1878         :param bool leavemm: do we need to leave memoryview of remaining
1879                     data as is, or convert it to bytes otherwise
1880         :param decode_path: current decode path (tuples of strings,
1881                             possibly with DecodePathDefBy) with will be
1882                             the root for all underlying objects
1883         :param ctx: optional :ref:`context <ctx>` governing decoding process
1884         :param bool tag_only: decode only the tag, without length and
1885                               contents (used only in Choice and Set
1886                               structures, trying to determine if tag satisfies
1887                               the schema)
1888         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1889                                     before using it?
1890         :returns: (Obj, remaining data)
1891
1892         .. seealso:: :ref:`decoding`
1893         """
1894         result = next(self.decode_evgen(
1895             data,
1896             offset,
1897             leavemm,
1898             decode_path,
1899             ctx,
1900             tag_only,
1901             _ctx_immutable,
1902             _evgen_mode=False,
1903         ))
1904         if result is None:
1905             return None
1906         _, obj, tail = result
1907         return obj, tail
1908
1909     def decode_evgen(
1910             self,
1911             data,
1912             offset=0,
1913             leavemm=False,
1914             decode_path=(),
1915             ctx=None,
1916             tag_only=False,
1917             _ctx_immutable=True,
1918             _evgen_mode=True,
1919     ):
1920         """Decode with evgen mode on
1921
1922         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1923         it returns the generator producing ``(decode_path, obj, tail)``
1924         values. See :ref:`evgen mode <evgen_mode>`.
1925         """
1926         if ctx is None:
1927             ctx = {}
1928         elif _ctx_immutable:
1929             ctx = copy(ctx)
1930         tlv = memoryview(data)
1931         if (
1932                 _evgen_mode and
1933                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1934         ):
1935             _evgen_mode = False
1936         if self._expl is None:
1937             for result in self._decode(
1938                     tlv,
1939                     offset=offset,
1940                     decode_path=decode_path,
1941                     ctx=ctx,
1942                     tag_only=tag_only,
1943                     evgen_mode=_evgen_mode,
1944             ):
1945                 if tag_only:
1946                     yield None
1947                     return
1948                 _decode_path, obj, tail = result
1949                 if not _decode_path is decode_path:
1950                     yield result
1951         else:
1952             try:
1953                 t, tlen, lv = tag_strip(tlv)
1954             except DecodeError as err:
1955                 raise err.__class__(
1956                     msg=err.msg,
1957                     klass=self.__class__,
1958                     decode_path=decode_path,
1959                     offset=offset,
1960                 )
1961             if t != self._expl:
1962                 raise TagMismatch(
1963                     klass=self.__class__,
1964                     decode_path=decode_path,
1965                     offset=offset,
1966                 )
1967             try:
1968                 l, llen, v = len_decode(lv)
1969             except LenIndefForm as err:
1970                 if not ctx.get("bered", False):
1971                     raise err.__class__(
1972                         msg=err.msg,
1973                         klass=self.__class__,
1974                         decode_path=decode_path,
1975                         offset=offset,
1976                     )
1977                 llen, v = 1, lv[1:]
1978                 offset += tlen + llen
1979                 for result in self._decode(
1980                         v,
1981                         offset=offset,
1982                         decode_path=decode_path,
1983                         ctx=ctx,
1984                         tag_only=tag_only,
1985                         evgen_mode=_evgen_mode,
1986                 ):
1987                     if tag_only:  # pragma: no cover
1988                         yield None
1989                         return
1990                     _decode_path, obj, tail = result
1991                     if not _decode_path is decode_path:
1992                         yield result
1993                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1994                 if eoc_expected.tobytes() != EOC:
1995                     raise DecodeError(
1996                         "no EOC",
1997                         klass=self.__class__,
1998                         decode_path=decode_path,
1999                         offset=offset,
2000                     )
2001                 obj.vlen += EOC_LEN
2002                 obj.expl_lenindef = True
2003             except DecodeError as err:
2004                 raise err.__class__(
2005                     msg=err.msg,
2006                     klass=self.__class__,
2007                     decode_path=decode_path,
2008                     offset=offset,
2009                 )
2010             else:
2011                 if l > len(v):
2012                     raise NotEnoughData(
2013                         "encoded length is longer than data",
2014                         klass=self.__class__,
2015                         decode_path=decode_path,
2016                         offset=offset,
2017                     )
2018                 for result in self._decode(
2019                         v,
2020                         offset=offset + tlen + llen,
2021                         decode_path=decode_path,
2022                         ctx=ctx,
2023                         tag_only=tag_only,
2024                         evgen_mode=_evgen_mode,
2025                 ):
2026                     if tag_only:  # pragma: no cover
2027                         yield None
2028                         return
2029                     _decode_path, obj, tail = result
2030                     if not _decode_path is decode_path:
2031                         yield result
2032                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2033                     raise DecodeError(
2034                         "explicit tag out-of-bound, longer than data",
2035                         klass=self.__class__,
2036                         decode_path=decode_path,
2037                         offset=offset,
2038                     )
2039         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2040
2041     def decod(self, data, offset=0, decode_path=(), ctx=None):
2042         """Decode the data, check that tail is empty
2043
2044         :raises ExceedingData: if tail is not empty
2045
2046         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2047         (decode without tail) that also checks that there is no
2048         trailing data left.
2049         """
2050         obj, tail = self.decode(
2051             data,
2052             offset=offset,
2053             decode_path=decode_path,
2054             ctx=ctx,
2055             leavemm=True,
2056         )
2057         if len(tail) > 0:
2058             raise ExceedingData(len(tail))
2059         return obj
2060
2061     def hexdecode(self, data, *args, **kwargs):
2062         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2063         """
2064         return self.decode(hexdec(data), *args, **kwargs)
2065
2066     def hexdecod(self, data, *args, **kwargs):
2067         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2068         """
2069         return self.decod(hexdec(data), *args, **kwargs)
2070
2071     @property
2072     def expled(self):
2073         """See :ref:`decoding`
2074         """
2075         return self._expl is not None
2076
2077     @property
2078     def expl_tag(self):
2079         """See :ref:`decoding`
2080         """
2081         return self._expl
2082
2083     @property
2084     def expl_tlen(self):
2085         """See :ref:`decoding`
2086         """
2087         return len(self._expl)
2088
2089     @property
2090     def expl_llen(self):
2091         """See :ref:`decoding`
2092         """
2093         if self.expl_lenindef:
2094             return 1
2095         return len(len_encode(self.tlvlen))
2096
2097     @property
2098     def expl_offset(self):
2099         """See :ref:`decoding`
2100         """
2101         return self.offset - self.expl_tlen - self.expl_llen
2102
2103     @property
2104     def expl_vlen(self):
2105         """See :ref:`decoding`
2106         """
2107         return self.tlvlen
2108
2109     @property
2110     def expl_tlvlen(self):
2111         """See :ref:`decoding`
2112         """
2113         return self.expl_tlen + self.expl_llen + self.expl_vlen
2114
2115     @property
2116     def fulloffset(self):
2117         """See :ref:`decoding`
2118         """
2119         return self.expl_offset if self.expled else self.offset
2120
2121     @property
2122     def fulllen(self):
2123         """See :ref:`decoding`
2124         """
2125         return self.expl_tlvlen if self.expled else self.tlvlen
2126
2127     def pps_lenindef(self, decode_path):
2128         if self.lenindef and not (
2129                 getattr(self, "defined", None) is not None and
2130                 self.defined[1].lenindef
2131         ):
2132             yield _pp(
2133                 asn1_type_name="EOC",
2134                 obj_name="",
2135                 decode_path=decode_path,
2136                 offset=(
2137                     self.offset + self.tlvlen -
2138                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2139                 ),
2140                 tlen=1,
2141                 llen=1,
2142                 vlen=0,
2143                 ber_encoded=True,
2144                 bered=True,
2145             )
2146         if self.expl_lenindef:
2147             yield _pp(
2148                 asn1_type_name="EOC",
2149                 obj_name="EXPLICIT",
2150                 decode_path=decode_path,
2151                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2152                 tlen=1,
2153                 llen=1,
2154                 vlen=0,
2155                 ber_encoded=True,
2156                 bered=True,
2157             )
2158
2159
2160 def encode_cer(obj):
2161     """Encode to CER in memory buffer
2162
2163     :returns bytes: memory buffer contents
2164     """
2165     buf = BytesIO()
2166     obj.encode_cer(buf.write)
2167     return buf.getvalue()
2168
2169
2170 def encode2pass(obj):
2171     """Encode (2-pass mode) to DER in memory buffer
2172
2173     :returns bytes: memory buffer contents
2174     """
2175     buf = BytesIO()
2176     _, state = obj.encode1st()
2177     obj.encode2nd(buf.write, iter(state))
2178     return buf.getvalue()
2179
2180
2181 class DecodePathDefBy(object):
2182     """DEFINED BY representation inside decode path
2183     """
2184     __slots__ = ("defined_by",)
2185
2186     def __init__(self, defined_by):
2187         self.defined_by = defined_by
2188
2189     def __ne__(self, their):
2190         return not(self == their)
2191
2192     def __eq__(self, their):
2193         if not isinstance(their, self.__class__):
2194             return False
2195         return self.defined_by == their.defined_by
2196
2197     def __str__(self):
2198         return "DEFINED BY " + str(self.defined_by)
2199
2200     def __repr__(self):
2201         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2202
2203
2204 ########################################################################
2205 # Pretty printing
2206 ########################################################################
2207
2208 PP = namedtuple("PP", (
2209     "obj",
2210     "asn1_type_name",
2211     "obj_name",
2212     "decode_path",
2213     "value",
2214     "blob",
2215     "optional",
2216     "default",
2217     "impl",
2218     "expl",
2219     "offset",
2220     "tlen",
2221     "llen",
2222     "vlen",
2223     "expl_offset",
2224     "expl_tlen",
2225     "expl_llen",
2226     "expl_vlen",
2227     "expl_lenindef",
2228     "lenindef",
2229     "ber_encoded",
2230     "bered",
2231 ), **NAMEDTUPLE_KWARGS)
2232
2233
2234 def _pp(
2235         obj=None,
2236         asn1_type_name="unknown",
2237         obj_name="unknown",
2238         decode_path=(),
2239         value=None,
2240         blob=None,
2241         optional=False,
2242         default=False,
2243         impl=None,
2244         expl=None,
2245         offset=0,
2246         tlen=0,
2247         llen=0,
2248         vlen=0,
2249         expl_offset=None,
2250         expl_tlen=None,
2251         expl_llen=None,
2252         expl_vlen=None,
2253         expl_lenindef=False,
2254         lenindef=False,
2255         ber_encoded=False,
2256         bered=False,
2257 ):
2258     return PP(
2259         obj,
2260         asn1_type_name,
2261         obj_name,
2262         decode_path,
2263         value,
2264         blob,
2265         optional,
2266         default,
2267         impl,
2268         expl,
2269         offset,
2270         tlen,
2271         llen,
2272         vlen,
2273         expl_offset,
2274         expl_tlen,
2275         expl_llen,
2276         expl_vlen,
2277         expl_lenindef,
2278         lenindef,
2279         ber_encoded,
2280         bered,
2281     )
2282
2283
2284 def _colourize(what, colour, with_colours, attrs=("bold",)):
2285     return colored(what, colour, attrs=attrs) if with_colours else what
2286
2287
2288 def colonize_hex(hexed):
2289     """Separate hexadecimal string with colons
2290     """
2291     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2292
2293
2294 def pp_console_row(
2295         pp,
2296         oid_maps=(),
2297         with_offsets=False,
2298         with_blob=True,
2299         with_colours=False,
2300         with_decode_path=False,
2301         decode_path_len_decrease=0,
2302 ):
2303     cols = []
2304     if with_offsets:
2305         col = "%5d%s%s" % (
2306             pp.offset,
2307             (
2308                 "  " if pp.expl_offset is None else
2309                 ("-%d" % (pp.offset - pp.expl_offset))
2310             ),
2311             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2312         )
2313         col = _colourize(col, "red", with_colours, ())
2314         col += _colourize("B", "red", with_colours) if pp.bered else " "
2315         cols.append(col)
2316         col = "[%d,%d,%4d]%s" % (
2317             pp.tlen,
2318             pp.llen,
2319             pp.vlen,
2320             LENINDEF_PP_CHAR if pp.lenindef else " "
2321         )
2322         col = _colourize(col, "green", with_colours, ())
2323         cols.append(col)
2324     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2325     if decode_path_len > 0:
2326         cols.append(" ." * decode_path_len)
2327         ent = pp.decode_path[-1]
2328         if isinstance(ent, DecodePathDefBy):
2329             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2330             value = str(ent.defined_by)
2331             oid_name = None
2332             if (
2333                     len(oid_maps) > 0 and
2334                     ent.defined_by.asn1_type_name ==
2335                     ObjectIdentifier.asn1_type_name
2336             ):
2337                 for oid_map in oid_maps:
2338                     oid_name = oid_map.get(value)
2339                     if oid_name is not None:
2340                         cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2341                         break
2342             if oid_name is None:
2343                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2344         else:
2345             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2346     if pp.expl is not None:
2347         klass, _, num = pp.expl
2348         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2349         cols.append(_colourize(col, "blue", with_colours))
2350     if pp.impl is not None:
2351         klass, _, num = pp.impl
2352         col = "[%s%d]" % (TagClassReprs[klass], num)
2353         cols.append(_colourize(col, "blue", with_colours))
2354     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2355         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2356     if pp.ber_encoded:
2357         cols.append(_colourize("BER", "red", with_colours))
2358     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2359     if pp.value is not None:
2360         value = pp.value
2361         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2362         if (
2363                 len(oid_maps) > 0 and
2364                 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2365         ):
2366             for oid_map in oid_maps:
2367                 oid_name = oid_map.get(value)
2368                 if oid_name is not None:
2369                     cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2370                     break
2371         if pp.asn1_type_name == Integer.asn1_type_name:
2372             hex_repr = hex(int(pp.obj._value))[2:].upper()
2373             if len(hex_repr) % 2 != 0:
2374                 hex_repr = "0" + hex_repr
2375             cols.append(_colourize(
2376                 "(%s)" % colonize_hex(hex_repr),
2377                 "green",
2378                 with_colours,
2379             ))
2380     if with_blob:
2381         if pp.blob.__class__ == binary_type:
2382             cols.append(hexenc(pp.blob))
2383         elif pp.blob.__class__ == tuple:
2384             cols.append(", ".join(pp.blob))
2385     if pp.optional:
2386         cols.append(_colourize("OPTIONAL", "red", with_colours))
2387     if pp.default:
2388         cols.append(_colourize("DEFAULT", "red", with_colours))
2389     if with_decode_path:
2390         cols.append(_colourize(
2391             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2392             "grey",
2393             with_colours,
2394         ))
2395     return " ".join(cols)
2396
2397
2398 def pp_console_blob(pp, decode_path_len_decrease=0):
2399     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2400     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2401     if decode_path_len > 0:
2402         cols.append(" ." * (decode_path_len + 1))
2403     if pp.blob.__class__ == binary_type:
2404         blob = hexenc(pp.blob).upper()
2405         for i in six_xrange(0, len(blob), 32):
2406             chunk = blob[i:i + 32]
2407             yield " ".join(cols + [colonize_hex(chunk)])
2408     elif pp.blob.__class__ == tuple:
2409         yield " ".join(cols + [", ".join(pp.blob)])
2410
2411
2412 def pprint(
2413         obj,
2414         oid_maps=(),
2415         big_blobs=False,
2416         with_colours=False,
2417         with_decode_path=False,
2418         decode_path_only=(),
2419         decode_path=(),
2420 ):
2421     """Pretty print object
2422
2423     :param Obj obj: object you want to pretty print
2424     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2425                      Its human readable form is printed when OID is met
2426     :param big_blobs: if large binary objects are met (like OctetString
2427                       values), do we need to print them too, on separate
2428                       lines
2429     :param with_colours: colourize output, if ``termcolor`` library
2430                          is available
2431     :param with_decode_path: print decode path
2432     :param decode_path_only: print only that specified decode path
2433     """
2434     def _pprint_pps(pps):
2435         for pp in pps:
2436             if hasattr(pp, "_fields"):
2437                 if (
2438                         decode_path_only != () and
2439                         tuple(
2440                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2441                         ) != decode_path_only
2442                 ):
2443                     continue
2444                 if big_blobs:
2445                     yield pp_console_row(
2446                         pp,
2447                         oid_maps=oid_maps,
2448                         with_offsets=True,
2449                         with_blob=False,
2450                         with_colours=with_colours,
2451                         with_decode_path=with_decode_path,
2452                         decode_path_len_decrease=len(decode_path_only),
2453                     )
2454                     for row in pp_console_blob(
2455                             pp,
2456                             decode_path_len_decrease=len(decode_path_only),
2457                     ):
2458                         yield row
2459                 else:
2460                     yield pp_console_row(
2461                         pp,
2462                         oid_maps=oid_maps,
2463                         with_offsets=True,
2464                         with_blob=True,
2465                         with_colours=with_colours,
2466                         with_decode_path=with_decode_path,
2467                         decode_path_len_decrease=len(decode_path_only),
2468                     )
2469             else:
2470                 for row in _pprint_pps(pp):
2471                     yield row
2472     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2473
2474
2475 ########################################################################
2476 # ASN.1 primitive types
2477 ########################################################################
2478
2479 BooleanState = namedtuple(
2480     "BooleanState",
2481     BasicState._fields + ("value",),
2482     **NAMEDTUPLE_KWARGS
2483 )
2484
2485
2486 class Boolean(Obj):
2487     """``BOOLEAN`` boolean type
2488
2489     >>> b = Boolean(True)
2490     BOOLEAN True
2491     >>> b == Boolean(True)
2492     True
2493     >>> bool(b)
2494     True
2495     """
2496     __slots__ = ()
2497     tag_default = tag_encode(1)
2498     asn1_type_name = "BOOLEAN"
2499
2500     def __init__(
2501             self,
2502             value=None,
2503             impl=None,
2504             expl=None,
2505             default=None,
2506             optional=False,
2507             _decoded=(0, 0, 0),
2508     ):
2509         """
2510         :param value: set the value. Either boolean type, or
2511                       :py:class:`pyderasn.Boolean` object
2512         :param bytes impl: override default tag with ``IMPLICIT`` one
2513         :param bytes expl: override default tag with ``EXPLICIT`` one
2514         :param default: set default value. Type same as in ``value``
2515         :param bool optional: is object ``OPTIONAL`` in sequence
2516         """
2517         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2518         self._value = None if value is None else self._value_sanitize(value)
2519         if default is not None:
2520             default = self._value_sanitize(default)
2521             self.default = self.__class__(
2522                 value=default,
2523                 impl=self.tag,
2524                 expl=self._expl,
2525             )
2526             if value is None:
2527                 self._value = default
2528
2529     def _value_sanitize(self, value):
2530         if value.__class__ == bool:
2531             return value
2532         if issubclass(value.__class__, Boolean):
2533             return value._value
2534         raise InvalidValueType((self.__class__, bool))
2535
2536     @property
2537     def ready(self):
2538         return self._value is not None
2539
2540     def __getstate__(self):
2541         return BooleanState(
2542             __version__,
2543             self.tag,
2544             self._tag_order,
2545             self._expl,
2546             self.default,
2547             self.optional,
2548             self.offset,
2549             self.llen,
2550             self.vlen,
2551             self.expl_lenindef,
2552             self.lenindef,
2553             self.ber_encoded,
2554             self._value,
2555         )
2556
2557     def __setstate__(self, state):
2558         super(Boolean, self).__setstate__(state)
2559         self._value = state.value
2560
2561     def __nonzero__(self):
2562         self._assert_ready()
2563         return self._value
2564
2565     def __bool__(self):
2566         self._assert_ready()
2567         return self._value
2568
2569     def __eq__(self, their):
2570         if their.__class__ == bool:
2571             return self._value == their
2572         if not issubclass(their.__class__, Boolean):
2573             return False
2574         return (
2575             self._value == their._value and
2576             self.tag == their.tag and
2577             self._expl == their._expl
2578         )
2579
2580     def __call__(
2581             self,
2582             value=None,
2583             impl=None,
2584             expl=None,
2585             default=None,
2586             optional=None,
2587     ):
2588         return self.__class__(
2589             value=value,
2590             impl=self.tag if impl is None else impl,
2591             expl=self._expl if expl is None else expl,
2592             default=self.default if default is None else default,
2593             optional=self.optional if optional is None else optional,
2594         )
2595
2596     def _encode(self):
2597         self._assert_ready()
2598         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2599
2600     def _encode1st(self, state):
2601         return len(self.tag) + 2, state
2602
2603     def _encode2nd(self, writer, state_iter):
2604         self._assert_ready()
2605         write_full(writer, self._encode())
2606
2607     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2608         try:
2609             t, _, lv = tag_strip(tlv)
2610         except DecodeError as err:
2611             raise err.__class__(
2612                 msg=err.msg,
2613                 klass=self.__class__,
2614                 decode_path=decode_path,
2615                 offset=offset,
2616             )
2617         if t != self.tag:
2618             raise TagMismatch(
2619                 klass=self.__class__,
2620                 decode_path=decode_path,
2621                 offset=offset,
2622             )
2623         if tag_only:
2624             yield None
2625             return
2626         try:
2627             l, _, v = len_decode(lv)
2628         except DecodeError as err:
2629             raise err.__class__(
2630                 msg=err.msg,
2631                 klass=self.__class__,
2632                 decode_path=decode_path,
2633                 offset=offset,
2634             )
2635         if l != 1:
2636             raise InvalidLength(
2637                 "Boolean's length must be equal to 1",
2638                 klass=self.__class__,
2639                 decode_path=decode_path,
2640                 offset=offset,
2641             )
2642         if l > len(v):
2643             raise NotEnoughData(
2644                 "encoded length is longer than data",
2645                 klass=self.__class__,
2646                 decode_path=decode_path,
2647                 offset=offset,
2648             )
2649         first_octet = byte2int(v)
2650         ber_encoded = False
2651         if first_octet == 0:
2652             value = False
2653         elif first_octet == 0xFF:
2654             value = True
2655         elif ctx.get("bered", False):
2656             value = True
2657             ber_encoded = True
2658         else:
2659             raise DecodeError(
2660                 "unacceptable Boolean value",
2661                 klass=self.__class__,
2662                 decode_path=decode_path,
2663                 offset=offset,
2664             )
2665         obj = self.__class__(
2666             value=value,
2667             impl=self.tag,
2668             expl=self._expl,
2669             default=self.default,
2670             optional=self.optional,
2671             _decoded=(offset, 1, 1),
2672         )
2673         obj.ber_encoded = ber_encoded
2674         yield decode_path, obj, v[1:]
2675
2676     def __repr__(self):
2677         return pp_console_row(next(self.pps()))
2678
2679     def pps(self, decode_path=()):
2680         yield _pp(
2681             obj=self,
2682             asn1_type_name=self.asn1_type_name,
2683             obj_name=self.__class__.__name__,
2684             decode_path=decode_path,
2685             value=str(self._value) if self.ready else None,
2686             optional=self.optional,
2687             default=self == self.default,
2688             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2689             expl=None if self._expl is None else tag_decode(self._expl),
2690             offset=self.offset,
2691             tlen=self.tlen,
2692             llen=self.llen,
2693             vlen=self.vlen,
2694             expl_offset=self.expl_offset if self.expled else None,
2695             expl_tlen=self.expl_tlen if self.expled else None,
2696             expl_llen=self.expl_llen if self.expled else None,
2697             expl_vlen=self.expl_vlen if self.expled else None,
2698             expl_lenindef=self.expl_lenindef,
2699             ber_encoded=self.ber_encoded,
2700             bered=self.bered,
2701         )
2702         for pp in self.pps_lenindef(decode_path):
2703             yield pp
2704
2705
2706 IntegerState = namedtuple(
2707     "IntegerState",
2708     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2709     **NAMEDTUPLE_KWARGS
2710 )
2711
2712
2713 class Integer(Obj):
2714     """``INTEGER`` integer type
2715
2716     >>> b = Integer(-123)
2717     INTEGER -123
2718     >>> b == Integer(-123)
2719     True
2720     >>> int(b)
2721     -123
2722
2723     >>> Integer(2, bounds=(1, 3))
2724     INTEGER 2
2725     >>> Integer(5, bounds=(1, 3))
2726     Traceback (most recent call last):
2727     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2728
2729     ::
2730
2731         class Version(Integer):
2732             schema = (
2733                 ("v1", 0),
2734                 ("v2", 1),
2735                 ("v3", 2),
2736             )
2737
2738     >>> v = Version("v1")
2739     Version INTEGER v1
2740     >>> int(v)
2741     0
2742     >>> v.named
2743     'v1'
2744     >>> v.specs
2745     {'v3': 2, 'v1': 0, 'v2': 1}
2746     """
2747     __slots__ = ("specs", "_bound_min", "_bound_max")
2748     tag_default = tag_encode(2)
2749     asn1_type_name = "INTEGER"
2750
2751     def __init__(
2752             self,
2753             value=None,
2754             bounds=None,
2755             impl=None,
2756             expl=None,
2757             default=None,
2758             optional=False,
2759             _specs=None,
2760             _decoded=(0, 0, 0),
2761     ):
2762         """
2763         :param value: set the value. Either integer type, named value
2764                       (if ``schema`` is specified in the class), or
2765                       :py:class:`pyderasn.Integer` object
2766         :param bounds: set ``(MIN, MAX)`` value constraint.
2767                        (-inf, +inf) by default
2768         :param bytes impl: override default tag with ``IMPLICIT`` one
2769         :param bytes expl: override default tag with ``EXPLICIT`` one
2770         :param default: set default value. Type same as in ``value``
2771         :param bool optional: is object ``OPTIONAL`` in sequence
2772         """
2773         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2774         self._value = value
2775         specs = getattr(self, "schema", {}) if _specs is None else _specs
2776         self.specs = specs if specs.__class__ == dict else dict(specs)
2777         self._bound_min, self._bound_max = getattr(
2778             self,
2779             "bounds",
2780             (float("-inf"), float("+inf")),
2781         ) if bounds is None else bounds
2782         if value is not None:
2783             self._value = self._value_sanitize(value)
2784         if default is not None:
2785             default = self._value_sanitize(default)
2786             self.default = self.__class__(
2787                 value=default,
2788                 impl=self.tag,
2789                 expl=self._expl,
2790                 _specs=self.specs,
2791             )
2792             if self._value is None:
2793                 self._value = default
2794
2795     def _value_sanitize(self, value):
2796         if isinstance(value, integer_types):
2797             pass
2798         elif issubclass(value.__class__, Integer):
2799             value = value._value
2800         elif value.__class__ == str:
2801             value = self.specs.get(value)
2802             if value is None:
2803                 raise ObjUnknown("integer value: %s" % value)
2804         else:
2805             raise InvalidValueType((self.__class__, int, str))
2806         if not self._bound_min <= value <= self._bound_max:
2807             raise BoundsError(self._bound_min, value, self._bound_max)
2808         return value
2809
2810     @property
2811     def ready(self):
2812         return self._value is not None
2813
2814     def __getstate__(self):
2815         return IntegerState(
2816             __version__,
2817             self.tag,
2818             self._tag_order,
2819             self._expl,
2820             self.default,
2821             self.optional,
2822             self.offset,
2823             self.llen,
2824             self.vlen,
2825             self.expl_lenindef,
2826             self.lenindef,
2827             self.ber_encoded,
2828             self.specs,
2829             self._value,
2830             self._bound_min,
2831             self._bound_max,
2832         )
2833
2834     def __setstate__(self, state):
2835         super(Integer, self).__setstate__(state)
2836         self.specs = state.specs
2837         self._value = state.value
2838         self._bound_min = state.bound_min
2839         self._bound_max = state.bound_max
2840
2841     def __int__(self):
2842         self._assert_ready()
2843         return int(self._value)
2844
2845     def __hash__(self):
2846         self._assert_ready()
2847         return hash(b"".join((
2848             self.tag,
2849             bytes(self._expl or b""),
2850             str(self._value).encode("ascii"),
2851         )))
2852
2853     def __eq__(self, their):
2854         if isinstance(their, integer_types):
2855             return self._value == their
2856         if not issubclass(their.__class__, Integer):
2857             return False
2858         return (
2859             self._value == their._value and
2860             self.tag == their.tag and
2861             self._expl == their._expl
2862         )
2863
2864     def __lt__(self, their):
2865         return self._value < their._value
2866
2867     @property
2868     def named(self):
2869         """Return named representation (if exists) of the value
2870         """
2871         for name, value in iteritems(self.specs):
2872             if value == self._value:
2873                 return name
2874         return None
2875
2876     def __call__(
2877             self,
2878             value=None,
2879             bounds=None,
2880             impl=None,
2881             expl=None,
2882             default=None,
2883             optional=None,
2884     ):
2885         return self.__class__(
2886             value=value,
2887             bounds=(
2888                 (self._bound_min, self._bound_max)
2889                 if bounds is None else bounds
2890             ),
2891             impl=self.tag if impl is None else impl,
2892             expl=self._expl if expl is None else expl,
2893             default=self.default if default is None else default,
2894             optional=self.optional if optional is None else optional,
2895             _specs=self.specs,
2896         )
2897
2898     def _encode_payload(self):
2899         self._assert_ready()
2900         value = self._value
2901         if PY2:
2902             if value == 0:
2903                 octets = bytearray([0])
2904             elif value < 0:
2905                 value = -value
2906                 value -= 1
2907                 octets = bytearray()
2908                 while value > 0:
2909                     octets.append((value & 0xFF) ^ 0xFF)
2910                     value >>= 8
2911                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2912                     octets.append(0xFF)
2913             else:
2914                 octets = bytearray()
2915                 while value > 0:
2916                     octets.append(value & 0xFF)
2917                     value >>= 8
2918                 if octets[-1] & 0x80 > 0:
2919                     octets.append(0x00)
2920             octets.reverse()
2921             octets = bytes(octets)
2922         else:
2923             bytes_len = ceil(value.bit_length() / 8) or 1
2924             while True:
2925                 try:
2926                     octets = value.to_bytes(
2927                         bytes_len,
2928                         byteorder="big",
2929                         signed=True,
2930                     )
2931                 except OverflowError:
2932                     bytes_len += 1
2933                 else:
2934                     break
2935         return octets
2936         return b"".join((self.tag, len_encode(len(octets)), octets))
2937
2938     def _encode(self):
2939         octets = self._encode_payload()
2940         return b"".join((self.tag, len_encode(len(octets)), octets))
2941
2942     def _encode1st(self, state):
2943         l = len(self._encode_payload())
2944         return len(self.tag) + len_size(l) + l, state
2945
2946     def _encode2nd(self, writer, state_iter):
2947         write_full(writer, self._encode())
2948
2949     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2950         try:
2951             t, _, lv = tag_strip(tlv)
2952         except DecodeError as err:
2953             raise err.__class__(
2954                 msg=err.msg,
2955                 klass=self.__class__,
2956                 decode_path=decode_path,
2957                 offset=offset,
2958             )
2959         if t != self.tag:
2960             raise TagMismatch(
2961                 klass=self.__class__,
2962                 decode_path=decode_path,
2963                 offset=offset,
2964             )
2965         if tag_only:
2966             yield None
2967             return
2968         try:
2969             l, llen, v = len_decode(lv)
2970         except DecodeError as err:
2971             raise err.__class__(
2972                 msg=err.msg,
2973                 klass=self.__class__,
2974                 decode_path=decode_path,
2975                 offset=offset,
2976             )
2977         if l > len(v):
2978             raise NotEnoughData(
2979                 "encoded length is longer than data",
2980                 klass=self.__class__,
2981                 decode_path=decode_path,
2982                 offset=offset,
2983             )
2984         if l == 0:
2985             raise NotEnoughData(
2986                 "zero length",
2987                 klass=self.__class__,
2988                 decode_path=decode_path,
2989                 offset=offset,
2990             )
2991         v, tail = v[:l], v[l:]
2992         first_octet = byte2int(v)
2993         if l > 1:
2994             second_octet = byte2int(v[1:])
2995             if (
2996                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2997                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2998             ):
2999                 raise DecodeError(
3000                     "non normalized integer",
3001                     klass=self.__class__,
3002                     decode_path=decode_path,
3003                     offset=offset,
3004                 )
3005         if PY2:
3006             value = 0
3007             if first_octet & 0x80 > 0:
3008                 octets = bytearray()
3009                 for octet in bytearray(v):
3010                     octets.append(octet ^ 0xFF)
3011                 for octet in octets:
3012                     value = (value << 8) | octet
3013                 value += 1
3014                 value = -value
3015             else:
3016                 for octet in bytearray(v):
3017                     value = (value << 8) | octet
3018         else:
3019             value = int.from_bytes(v, byteorder="big", signed=True)
3020         try:
3021             obj = self.__class__(
3022                 value=value,
3023                 bounds=(self._bound_min, self._bound_max),
3024                 impl=self.tag,
3025                 expl=self._expl,
3026                 default=self.default,
3027                 optional=self.optional,
3028                 _specs=self.specs,
3029                 _decoded=(offset, llen, l),
3030             )
3031         except BoundsError as err:
3032             raise DecodeError(
3033                 msg=str(err),
3034                 klass=self.__class__,
3035                 decode_path=decode_path,
3036                 offset=offset,
3037             )
3038         yield decode_path, obj, tail
3039
3040     def __repr__(self):
3041         return pp_console_row(next(self.pps()))
3042
3043     def pps(self, decode_path=()):
3044         yield _pp(
3045             obj=self,
3046             asn1_type_name=self.asn1_type_name,
3047             obj_name=self.__class__.__name__,
3048             decode_path=decode_path,
3049             value=(self.named or str(self._value)) if self.ready else None,
3050             optional=self.optional,
3051             default=self == self.default,
3052             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3053             expl=None if self._expl is None else tag_decode(self._expl),
3054             offset=self.offset,
3055             tlen=self.tlen,
3056             llen=self.llen,
3057             vlen=self.vlen,
3058             expl_offset=self.expl_offset if self.expled else None,
3059             expl_tlen=self.expl_tlen if self.expled else None,
3060             expl_llen=self.expl_llen if self.expled else None,
3061             expl_vlen=self.expl_vlen if self.expled else None,
3062             expl_lenindef=self.expl_lenindef,
3063             bered=self.bered,
3064         )
3065         for pp in self.pps_lenindef(decode_path):
3066             yield pp
3067
3068
3069 BitStringState = namedtuple(
3070     "BitStringState",
3071     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3072     **NAMEDTUPLE_KWARGS
3073 )
3074
3075
3076 class BitString(Obj):
3077     """``BIT STRING`` bit string type
3078
3079     >>> BitString(b"hello world")
3080     BIT STRING 88 bits 68656c6c6f20776f726c64
3081     >>> bytes(b)
3082     b'hello world'
3083     >>> b == b"hello world"
3084     True
3085     >>> b.bit_len
3086     88
3087
3088     >>> BitString("'0A3B5F291CD'H")
3089     BIT STRING 44 bits 0a3b5f291cd0
3090     >>> b = BitString("'010110000000'B")
3091     BIT STRING 12 bits 5800
3092     >>> b.bit_len
3093     12
3094     >>> b[0], b[1], b[2], b[3]
3095     (False, True, False, True)
3096     >>> b[1000]
3097     False
3098     >>> [v for v in b]
3099     [False, True, False, True, True, False, False, False, False, False, False, False]
3100
3101     ::
3102
3103         class KeyUsage(BitString):
3104             schema = (
3105                 ("digitalSignature", 0),
3106                 ("nonRepudiation", 1),
3107                 ("keyEncipherment", 2),
3108             )
3109
3110     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3111     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3112     >>> b.named
3113     ['nonRepudiation', 'keyEncipherment']
3114     >>> b.specs
3115     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3116
3117     .. note::
3118
3119        Pay attention that BIT STRING can be encoded both in primitive
3120        and constructed forms. Decoder always checks constructed form tag
3121        additionally to specified primitive one. If BER decoding is
3122        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3123        of DER restrictions.
3124     """
3125     __slots__ = ("tag_constructed", "specs", "defined")
3126     tag_default = tag_encode(3)
3127     asn1_type_name = "BIT STRING"
3128
3129     def __init__(
3130             self,
3131             value=None,
3132             impl=None,
3133             expl=None,
3134             default=None,
3135             optional=False,
3136             _specs=None,
3137             _decoded=(0, 0, 0),
3138     ):
3139         """
3140         :param value: set the value. Either binary type, tuple of named
3141                       values (if ``schema`` is specified in the class),
3142                       string in ``'XXX...'B`` form, or
3143                       :py:class:`pyderasn.BitString` object
3144         :param bytes impl: override default tag with ``IMPLICIT`` one
3145         :param bytes expl: override default tag with ``EXPLICIT`` one
3146         :param default: set default value. Type same as in ``value``
3147         :param bool optional: is object ``OPTIONAL`` in sequence
3148         """
3149         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3150         specs = getattr(self, "schema", {}) if _specs is None else _specs
3151         self.specs = specs if specs.__class__ == dict else dict(specs)
3152         self._value = None if value is None else self._value_sanitize(value)
3153         if default is not None:
3154             default = self._value_sanitize(default)
3155             self.default = self.__class__(
3156                 value=default,
3157                 impl=self.tag,
3158                 expl=self._expl,
3159             )
3160             if value is None:
3161                 self._value = default
3162         self.defined = None
3163         tag_klass, _, tag_num = tag_decode(self.tag)
3164         self.tag_constructed = tag_encode(
3165             klass=tag_klass,
3166             form=TagFormConstructed,
3167             num=tag_num,
3168         )
3169
3170     def _bits2octets(self, bits):
3171         if len(self.specs) > 0:
3172             bits = bits.rstrip("0")
3173         bit_len = len(bits)
3174         bits += "0" * ((8 - (bit_len % 8)) % 8)
3175         octets = bytearray(len(bits) // 8)
3176         for i in six_xrange(len(octets)):
3177             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3178         return bit_len, bytes(octets)
3179
3180     def _value_sanitize(self, value):
3181         if isinstance(value, (string_types, binary_type)):
3182             if (
3183                     isinstance(value, string_types) and
3184                     value.startswith("'")
3185             ):
3186                 if value.endswith("'B"):
3187                     value = value[1:-2]
3188                     if not frozenset(value) <= SET01:
3189                         raise ValueError("B's coding contains unacceptable chars")
3190                     return self._bits2octets(value)
3191                 if value.endswith("'H"):
3192                     value = value[1:-2]
3193                     return (
3194                         len(value) * 4,
3195                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3196                     )
3197             if value.__class__ == binary_type:
3198                 return (len(value) * 8, value)
3199             raise InvalidValueType((self.__class__, string_types, binary_type))
3200         if value.__class__ == tuple:
3201             if (
3202                     len(value) == 2 and
3203                     isinstance(value[0], integer_types) and
3204                     value[1].__class__ == binary_type
3205             ):
3206                 return value
3207             bits = []
3208             for name in value:
3209                 bit = self.specs.get(name)
3210                 if bit is None:
3211                     raise ObjUnknown("BitString value: %s" % name)
3212                 bits.append(bit)
3213             if len(bits) == 0:
3214                 return self._bits2octets("")
3215             bits = frozenset(bits)
3216             return self._bits2octets("".join(
3217                 ("1" if bit in bits else "0")
3218                 for bit in six_xrange(max(bits) + 1)
3219             ))
3220         if issubclass(value.__class__, BitString):
3221             return value._value
3222         raise InvalidValueType((self.__class__, binary_type, string_types))
3223
3224     @property
3225     def ready(self):
3226         return self._value is not None
3227
3228     def __getstate__(self):
3229         return BitStringState(
3230             __version__,
3231             self.tag,
3232             self._tag_order,
3233             self._expl,
3234             self.default,
3235             self.optional,
3236             self.offset,
3237             self.llen,
3238             self.vlen,
3239             self.expl_lenindef,
3240             self.lenindef,
3241             self.ber_encoded,
3242             self.specs,
3243             self._value,
3244             self.tag_constructed,
3245             self.defined,
3246         )
3247
3248     def __setstate__(self, state):
3249         super(BitString, self).__setstate__(state)
3250         self.specs = state.specs
3251         self._value = state.value
3252         self.tag_constructed = state.tag_constructed
3253         self.defined = state.defined
3254
3255     def __iter__(self):
3256         self._assert_ready()
3257         for i in six_xrange(self._value[0]):
3258             yield self[i]
3259
3260     @property
3261     def bit_len(self):
3262         """Returns number of bits in the string
3263         """
3264         self._assert_ready()
3265         return self._value[0]
3266
3267     def __bytes__(self):
3268         self._assert_ready()
3269         return self._value[1]
3270
3271     def __eq__(self, their):
3272         if their.__class__ == bytes:
3273             return self._value[1] == their
3274         if not issubclass(their.__class__, BitString):
3275             return False
3276         return (
3277             self._value == their._value and
3278             self.tag == their.tag and
3279             self._expl == their._expl
3280         )
3281
3282     @property
3283     def named(self):
3284         """Named representation (if exists) of the bits
3285
3286         :returns: [str(name), ...]
3287         """
3288         return [name for name, bit in iteritems(self.specs) if self[bit]]
3289
3290     def __call__(
3291             self,
3292             value=None,
3293             impl=None,
3294             expl=None,
3295             default=None,
3296             optional=None,
3297     ):
3298         return self.__class__(
3299             value=value,
3300             impl=self.tag if impl is None else impl,
3301             expl=self._expl if expl is None else expl,
3302             default=self.default if default is None else default,
3303             optional=self.optional if optional is None else optional,
3304             _specs=self.specs,
3305         )
3306
3307     def __getitem__(self, key):
3308         if key.__class__ == int:
3309             bit_len, octets = self._value
3310             if key >= bit_len:
3311                 return False
3312             return (
3313                 byte2int(memoryview(octets)[key // 8:]) >>
3314                 (7 - (key % 8))
3315             ) & 1 == 1
3316         if isinstance(key, string_types):
3317             value = self.specs.get(key)
3318             if value is None:
3319                 raise ObjUnknown("BitString value: %s" % key)
3320             return self[value]
3321         raise InvalidValueType((int, str))
3322
3323     def _encode(self):
3324         self._assert_ready()
3325         bit_len, octets = self._value
3326         return b"".join((
3327             self.tag,
3328             len_encode(len(octets) + 1),
3329             int2byte((8 - bit_len % 8) % 8),
3330             octets,
3331         ))
3332
3333     def _encode1st(self, state):
3334         self._assert_ready()
3335         _, octets = self._value
3336         l = len(octets) + 1
3337         return len(self.tag) + len_size(l) + l, state
3338
3339     def _encode2nd(self, writer, state_iter):
3340         bit_len, octets = self._value
3341         write_full(writer, b"".join((
3342             self.tag,
3343             len_encode(len(octets) + 1),
3344             int2byte((8 - bit_len % 8) % 8),
3345         )))
3346         write_full(writer, octets)
3347
3348     def _encode_cer(self, writer):
3349         bit_len, octets = self._value
3350         if len(octets) + 1 <= 1000:
3351             write_full(writer, self._encode())
3352             return
3353         write_full(writer, self.tag_constructed)
3354         write_full(writer, LENINDEF)
3355         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3356             write_full(writer, b"".join((
3357                 BitString.tag_default,
3358                 LEN1K,
3359                 int2byte(0),
3360                 octets[offset:offset + 999],
3361             )))
3362         tail = octets[offset+999:]
3363         if len(tail) > 0:
3364             tail = int2byte((8 - bit_len % 8) % 8) + tail
3365             write_full(writer, b"".join((
3366                 BitString.tag_default,
3367                 len_encode(len(tail)),
3368                 tail,
3369             )))
3370         write_full(writer, EOC)
3371
3372     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3373         try:
3374             t, tlen, lv = tag_strip(tlv)
3375         except DecodeError as err:
3376             raise err.__class__(
3377                 msg=err.msg,
3378                 klass=self.__class__,
3379                 decode_path=decode_path,
3380                 offset=offset,
3381             )
3382         if t == self.tag:
3383             if tag_only:  # pragma: no cover
3384                 yield None
3385                 return
3386             try:
3387                 l, llen, v = len_decode(lv)
3388             except DecodeError as err:
3389                 raise err.__class__(
3390                     msg=err.msg,
3391                     klass=self.__class__,
3392                     decode_path=decode_path,
3393                     offset=offset,
3394                 )
3395             if l > len(v):
3396                 raise NotEnoughData(
3397                     "encoded length is longer than data",
3398                     klass=self.__class__,
3399                     decode_path=decode_path,
3400                     offset=offset,
3401                 )
3402             if l == 0:
3403                 raise NotEnoughData(
3404                     "zero length",
3405                     klass=self.__class__,
3406                     decode_path=decode_path,
3407                     offset=offset,
3408                 )
3409             pad_size = byte2int(v)
3410             if l == 1 and pad_size != 0:
3411                 raise DecodeError(
3412                     "invalid empty value",
3413                     klass=self.__class__,
3414                     decode_path=decode_path,
3415                     offset=offset,
3416                 )
3417             if pad_size > 7:
3418                 raise DecodeError(
3419                     "too big pad",
3420                     klass=self.__class__,
3421                     decode_path=decode_path,
3422                     offset=offset,
3423                 )
3424             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3425                 raise DecodeError(
3426                     "invalid pad",
3427                     klass=self.__class__,
3428                     decode_path=decode_path,
3429                     offset=offset,
3430                 )
3431             v, tail = v[:l], v[l:]
3432             bit_len = (len(v) - 1) * 8 - pad_size
3433             obj = self.__class__(
3434                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3435                 impl=self.tag,
3436                 expl=self._expl,
3437                 default=self.default,
3438                 optional=self.optional,
3439                 _specs=self.specs,
3440                 _decoded=(offset, llen, l),
3441             )
3442             if evgen_mode:
3443                 obj._value = (bit_len, None)
3444             yield decode_path, obj, tail
3445             return
3446         if t != self.tag_constructed:
3447             raise TagMismatch(
3448                 klass=self.__class__,
3449                 decode_path=decode_path,
3450                 offset=offset,
3451             )
3452         if not ctx.get("bered", False):
3453             raise DecodeError(
3454                 "unallowed BER constructed encoding",
3455                 klass=self.__class__,
3456                 decode_path=decode_path,
3457                 offset=offset,
3458             )
3459         if tag_only:  # pragma: no cover
3460             yield None
3461             return
3462         lenindef = False
3463         try:
3464             l, llen, v = len_decode(lv)
3465         except LenIndefForm:
3466             llen, l, v = 1, 0, lv[1:]
3467             lenindef = True
3468         except DecodeError as err:
3469             raise err.__class__(
3470                 msg=err.msg,
3471                 klass=self.__class__,
3472                 decode_path=decode_path,
3473                 offset=offset,
3474             )
3475         if l > len(v):
3476             raise NotEnoughData(
3477                 "encoded length is longer than data",
3478                 klass=self.__class__,
3479                 decode_path=decode_path,
3480                 offset=offset,
3481             )
3482         if not lenindef and l == 0:
3483             raise NotEnoughData(
3484                 "zero length",
3485                 klass=self.__class__,
3486                 decode_path=decode_path,
3487                 offset=offset,
3488             )
3489         chunks = []
3490         sub_offset = offset + tlen + llen
3491         vlen = 0
3492         while True:
3493             if lenindef:
3494                 if v[:EOC_LEN].tobytes() == EOC:
3495                     break
3496             else:
3497                 if vlen == l:
3498                     break
3499                 if vlen > l:
3500                     raise DecodeError(
3501                         "chunk out of bounds",
3502                         klass=self.__class__,
3503                         decode_path=decode_path + (str(len(chunks) - 1),),
3504                         offset=chunks[-1].offset,
3505                     )
3506             sub_decode_path = decode_path + (str(len(chunks)),)
3507             try:
3508                 if evgen_mode:
3509                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3510                             v,
3511                             offset=sub_offset,
3512                             decode_path=sub_decode_path,
3513                             leavemm=True,
3514                             ctx=ctx,
3515                             _ctx_immutable=False,
3516                     ):
3517                         yield _decode_path, chunk, v_tail
3518                 else:
3519                     _, chunk, v_tail = next(BitString().decode_evgen(
3520                         v,
3521                         offset=sub_offset,
3522                         decode_path=sub_decode_path,
3523                         leavemm=True,
3524                         ctx=ctx,
3525                         _ctx_immutable=False,
3526                         _evgen_mode=False,
3527                     ))
3528             except TagMismatch:
3529                 raise DecodeError(
3530                     "expected BitString encoded chunk",
3531                     klass=self.__class__,
3532                     decode_path=sub_decode_path,
3533                     offset=sub_offset,
3534                 )
3535             chunks.append(chunk)
3536             sub_offset += chunk.tlvlen
3537             vlen += chunk.tlvlen
3538             v = v_tail
3539         if len(chunks) == 0:
3540             raise DecodeError(
3541                 "no chunks",
3542                 klass=self.__class__,
3543                 decode_path=decode_path,
3544                 offset=offset,
3545             )
3546         values = []
3547         bit_len = 0
3548         for chunk_i, chunk in enumerate(chunks[:-1]):
3549             if chunk.bit_len % 8 != 0:
3550                 raise DecodeError(
3551                     "BitString chunk is not multiple of 8 bits",
3552                     klass=self.__class__,
3553                     decode_path=decode_path + (str(chunk_i),),
3554                     offset=chunk.offset,
3555                 )
3556             if not evgen_mode:
3557                 values.append(bytes(chunk))
3558             bit_len += chunk.bit_len
3559         chunk_last = chunks[-1]
3560         if not evgen_mode:
3561             values.append(bytes(chunk_last))
3562         bit_len += chunk_last.bit_len
3563         obj = self.__class__(
3564             value=None if evgen_mode else (bit_len, b"".join(values)),
3565             impl=self.tag,
3566             expl=self._expl,
3567             default=self.default,
3568             optional=self.optional,
3569             _specs=self.specs,
3570             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3571         )
3572         if evgen_mode:
3573             obj._value = (bit_len, None)
3574         obj.lenindef = lenindef
3575         obj.ber_encoded = True
3576         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3577
3578     def __repr__(self):
3579         return pp_console_row(next(self.pps()))
3580
3581     def pps(self, decode_path=()):
3582         value = None
3583         blob = None
3584         if self.ready:
3585             bit_len, blob = self._value
3586             value = "%d bits" % bit_len
3587             if len(self.specs) > 0 and blob is not None:
3588                 blob = tuple(self.named)
3589         yield _pp(
3590             obj=self,
3591             asn1_type_name=self.asn1_type_name,
3592             obj_name=self.__class__.__name__,
3593             decode_path=decode_path,
3594             value=value,
3595             blob=blob,
3596             optional=self.optional,
3597             default=self == self.default,
3598             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3599             expl=None if self._expl is None else tag_decode(self._expl),
3600             offset=self.offset,
3601             tlen=self.tlen,
3602             llen=self.llen,
3603             vlen=self.vlen,
3604             expl_offset=self.expl_offset if self.expled else None,
3605             expl_tlen=self.expl_tlen if self.expled else None,
3606             expl_llen=self.expl_llen if self.expled else None,
3607             expl_vlen=self.expl_vlen if self.expled else None,
3608             expl_lenindef=self.expl_lenindef,
3609             lenindef=self.lenindef,
3610             ber_encoded=self.ber_encoded,
3611             bered=self.bered,
3612         )
3613         defined_by, defined = self.defined or (None, None)
3614         if defined_by is not None:
3615             yield defined.pps(
3616                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3617             )
3618         for pp in self.pps_lenindef(decode_path):
3619             yield pp
3620
3621
3622 OctetStringState = namedtuple(
3623     "OctetStringState",
3624     BasicState._fields + (
3625         "value",
3626         "bound_min",
3627         "bound_max",
3628         "tag_constructed",
3629         "defined",
3630     ),
3631     **NAMEDTUPLE_KWARGS
3632 )
3633
3634
3635 class OctetString(Obj):
3636     """``OCTET STRING`` binary string type
3637
3638     >>> s = OctetString(b"hello world")
3639     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3640     >>> s == OctetString(b"hello world")
3641     True
3642     >>> bytes(s)
3643     b'hello world'
3644
3645     >>> OctetString(b"hello", bounds=(4, 4))
3646     Traceback (most recent call last):
3647     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3648     >>> OctetString(b"hell", bounds=(4, 4))
3649     OCTET STRING 4 bytes 68656c6c
3650
3651     Memoryviews can be used as a values. If memoryview is made on
3652     mmap-ed file, then it does not take storage inside OctetString
3653     itself. In CER encoding mode it will be streamed to the specified
3654     writer, copying 1 KB chunks.
3655     """
3656     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3657     tag_default = tag_encode(4)
3658     asn1_type_name = "OCTET STRING"
3659     evgen_mode_skip_value = True
3660
3661     def __init__(
3662             self,
3663             value=None,
3664             bounds=None,
3665             impl=None,
3666             expl=None,
3667             default=None,
3668             optional=False,
3669             _decoded=(0, 0, 0),
3670             ctx=None,
3671     ):
3672         """
3673         :param value: set the value. Either binary type, or
3674                       :py:class:`pyderasn.OctetString` object
3675         :param bounds: set ``(MIN, MAX)`` value size constraint.
3676                        (-inf, +inf) by default
3677         :param bytes impl: override default tag with ``IMPLICIT`` one
3678         :param bytes expl: override default tag with ``EXPLICIT`` one
3679         :param default: set default value. Type same as in ``value``
3680         :param bool optional: is object ``OPTIONAL`` in sequence
3681         """
3682         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3683         self._value = value
3684         self._bound_min, self._bound_max = getattr(
3685             self,
3686             "bounds",
3687             (0, float("+inf")),
3688         ) if bounds is None else bounds
3689         if value is not None:
3690             self._value = self._value_sanitize(value)
3691         if default is not None:
3692             default = self._value_sanitize(default)
3693             self.default = self.__class__(
3694                 value=default,
3695                 impl=self.tag,
3696                 expl=self._expl,
3697             )
3698             if self._value is None:
3699                 self._value = default
3700         self.defined = None
3701         tag_klass, _, tag_num = tag_decode(self.tag)
3702         self.tag_constructed = tag_encode(
3703             klass=tag_klass,
3704             form=TagFormConstructed,
3705             num=tag_num,
3706         )
3707
3708     def _value_sanitize(self, value):
3709         if value.__class__ == binary_type or value.__class__ == memoryview:
3710             pass
3711         elif issubclass(value.__class__, OctetString):
3712             value = value._value
3713         else:
3714             raise InvalidValueType((self.__class__, bytes, memoryview))
3715         if not self._bound_min <= len(value) <= self._bound_max:
3716             raise BoundsError(self._bound_min, len(value), self._bound_max)
3717         return value
3718
3719     @property
3720     def ready(self):
3721         return self._value is not None
3722
3723     def __getstate__(self):
3724         return OctetStringState(
3725             __version__,
3726             self.tag,
3727             self._tag_order,
3728             self._expl,
3729             self.default,
3730             self.optional,
3731             self.offset,
3732             self.llen,
3733             self.vlen,
3734             self.expl_lenindef,
3735             self.lenindef,
3736             self.ber_encoded,
3737             self._value,
3738             self._bound_min,
3739             self._bound_max,
3740             self.tag_constructed,
3741             self.defined,
3742         )
3743
3744     def __setstate__(self, state):
3745         super(OctetString, self).__setstate__(state)
3746         self._value = state.value
3747         self._bound_min = state.bound_min
3748         self._bound_max = state.bound_max
3749         self.tag_constructed = state.tag_constructed
3750         self.defined = state.defined
3751
3752     def __bytes__(self):
3753         self._assert_ready()
3754         return bytes(self._value)
3755
3756     def __eq__(self, their):
3757         if their.__class__ == binary_type:
3758             return self._value == their
3759         if not issubclass(their.__class__, OctetString):
3760             return False
3761         return (
3762             self._value == their._value and
3763             self.tag == their.tag and
3764             self._expl == their._expl
3765         )
3766
3767     def __lt__(self, their):
3768         return self._value < their._value
3769
3770     def __call__(
3771             self,
3772             value=None,
3773             bounds=None,
3774             impl=None,
3775             expl=None,
3776             default=None,
3777             optional=None,
3778     ):
3779         return self.__class__(
3780             value=value,
3781             bounds=(
3782                 (self._bound_min, self._bound_max)
3783                 if bounds is None else bounds
3784             ),
3785             impl=self.tag if impl is None else impl,
3786             expl=self._expl if expl is None else expl,
3787             default=self.default if default is None else default,
3788             optional=self.optional if optional is None else optional,
3789         )
3790
3791     def _encode(self):
3792         self._assert_ready()
3793         return b"".join((
3794             self.tag,
3795             len_encode(len(self._value)),
3796             self._value,
3797         ))
3798
3799     def _encode1st(self, state):
3800         self._assert_ready()
3801         l = len(self._value)
3802         return len(self.tag) + len_size(l) + l, state
3803
3804     def _encode2nd(self, writer, state_iter):
3805         value = self._value
3806         write_full(writer, self.tag + len_encode(len(value)))
3807         write_full(writer, value)
3808
3809     def _encode_cer(self, writer):
3810         octets = self._value
3811         if len(octets) <= 1000:
3812             write_full(writer, self._encode())
3813             return
3814         write_full(writer, self.tag_constructed)
3815         write_full(writer, LENINDEF)
3816         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3817             write_full(writer, b"".join((
3818                 OctetString.tag_default,
3819                 LEN1K,
3820                 octets[offset:offset + 1000],
3821             )))
3822         tail = octets[offset+1000:]
3823         if len(tail) > 0:
3824             write_full(writer, b"".join((
3825                 OctetString.tag_default,
3826                 len_encode(len(tail)),
3827                 tail,
3828             )))
3829         write_full(writer, EOC)
3830
3831     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3832         try:
3833             t, tlen, lv = tag_strip(tlv)
3834         except DecodeError as err:
3835             raise err.__class__(
3836                 msg=err.msg,
3837                 klass=self.__class__,
3838                 decode_path=decode_path,
3839                 offset=offset,
3840             )
3841         if t == self.tag:
3842             if tag_only:
3843                 yield None
3844                 return
3845             try:
3846                 l, llen, v = len_decode(lv)
3847             except DecodeError as err:
3848                 raise err.__class__(
3849                     msg=err.msg,
3850                     klass=self.__class__,
3851                     decode_path=decode_path,
3852                     offset=offset,
3853                 )
3854             if l > len(v):
3855                 raise NotEnoughData(
3856                     "encoded length is longer than data",
3857                     klass=self.__class__,
3858                     decode_path=decode_path,
3859                     offset=offset,
3860                 )
3861             v, tail = v[:l], v[l:]
3862             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3863                 raise DecodeError(
3864                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3865                     klass=self.__class__,
3866                     decode_path=decode_path,
3867                     offset=offset,
3868                 )
3869             try:
3870                 obj = self.__class__(
3871                     value=(
3872                         None if (evgen_mode and self.evgen_mode_skip_value)
3873                         else v.tobytes()
3874                     ),
3875                     bounds=(self._bound_min, self._bound_max),
3876                     impl=self.tag,
3877                     expl=self._expl,
3878                     default=self.default,
3879                     optional=self.optional,
3880                     _decoded=(offset, llen, l),
3881                     ctx=ctx,
3882                 )
3883             except DecodeError as err:
3884                 raise DecodeError(
3885                     msg=err.msg,
3886                     klass=self.__class__,
3887                     decode_path=decode_path,
3888                     offset=offset,
3889                 )
3890             except BoundsError as err:
3891                 raise DecodeError(
3892                     msg=str(err),
3893                     klass=self.__class__,
3894                     decode_path=decode_path,
3895                     offset=offset,
3896                 )
3897             yield decode_path, obj, tail
3898             return
3899         if t != self.tag_constructed:
3900             raise TagMismatch(
3901                 klass=self.__class__,
3902                 decode_path=decode_path,
3903                 offset=offset,
3904             )
3905         if not ctx.get("bered", False):
3906             raise DecodeError(
3907                 "unallowed BER constructed encoding",
3908                 klass=self.__class__,
3909                 decode_path=decode_path,
3910                 offset=offset,
3911             )
3912         if tag_only:
3913             yield None
3914             return
3915         lenindef = False
3916         try:
3917             l, llen, v = len_decode(lv)
3918         except LenIndefForm:
3919             llen, l, v = 1, 0, lv[1:]
3920             lenindef = True
3921         except DecodeError as err:
3922             raise err.__class__(
3923                 msg=err.msg,
3924                 klass=self.__class__,
3925                 decode_path=decode_path,
3926                 offset=offset,
3927             )
3928         if l > len(v):
3929             raise NotEnoughData(
3930                 "encoded length is longer than data",
3931                 klass=self.__class__,
3932                 decode_path=decode_path,
3933                 offset=offset,
3934             )
3935         chunks = []
3936         chunks_count = 0
3937         sub_offset = offset + tlen + llen
3938         vlen = 0
3939         payload_len = 0
3940         while True:
3941             if lenindef:
3942                 if v[:EOC_LEN].tobytes() == EOC:
3943                     break
3944             else:
3945                 if vlen == l:
3946                     break
3947                 if vlen > l:
3948                     raise DecodeError(
3949                         "chunk out of bounds",
3950                         klass=self.__class__,
3951                         decode_path=decode_path + (str(len(chunks) - 1),),
3952                         offset=chunks[-1].offset,
3953                     )
3954             try:
3955                 if evgen_mode:
3956                     sub_decode_path = decode_path + (str(chunks_count),)
3957                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3958                             v,
3959                             offset=sub_offset,
3960                             decode_path=sub_decode_path,
3961                             leavemm=True,
3962                             ctx=ctx,
3963                             _ctx_immutable=False,
3964                     ):
3965                         yield _decode_path, chunk, v_tail
3966                         if not chunk.ber_encoded:
3967                             payload_len += chunk.vlen
3968                     chunks_count += 1
3969                 else:
3970                     sub_decode_path = decode_path + (str(len(chunks)),)
3971                     _, chunk, v_tail = next(OctetString().decode_evgen(
3972                         v,
3973                         offset=sub_offset,
3974                         decode_path=sub_decode_path,
3975                         leavemm=True,
3976                         ctx=ctx,
3977                         _ctx_immutable=False,
3978                         _evgen_mode=False,
3979                     ))
3980                     chunks.append(chunk)
3981             except TagMismatch:
3982                 raise DecodeError(
3983                     "expected OctetString encoded chunk",
3984                     klass=self.__class__,
3985                     decode_path=sub_decode_path,
3986                     offset=sub_offset,
3987                 )
3988             sub_offset += chunk.tlvlen
3989             vlen += chunk.tlvlen
3990             v = v_tail
3991         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3992             raise DecodeError(
3993                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3994                 klass=self.__class__,
3995                 decode_path=decode_path,
3996                 offset=offset,
3997             )
3998         try:
3999             obj = self.__class__(
4000                 value=(
4001                     None if evgen_mode else
4002                     b"".join(bytes(chunk) for chunk in chunks)
4003                 ),
4004                 bounds=(self._bound_min, self._bound_max),
4005                 impl=self.tag,
4006                 expl=self._expl,
4007                 default=self.default,
4008                 optional=self.optional,
4009                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4010                 ctx=ctx,
4011             )
4012         except DecodeError as err:
4013             raise DecodeError(
4014                 msg=err.msg,
4015                 klass=self.__class__,
4016                 decode_path=decode_path,
4017                 offset=offset,
4018             )
4019         except BoundsError as err:
4020             raise DecodeError(
4021                 msg=str(err),
4022                 klass=self.__class__,
4023                 decode_path=decode_path,
4024                 offset=offset,
4025             )
4026         obj.lenindef = lenindef
4027         obj.ber_encoded = True
4028         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4029
4030     def __repr__(self):
4031         return pp_console_row(next(self.pps()))
4032
4033     def pps(self, decode_path=()):
4034         yield _pp(
4035             obj=self,
4036             asn1_type_name=self.asn1_type_name,
4037             obj_name=self.__class__.__name__,
4038             decode_path=decode_path,
4039             value=("%d bytes" % len(self._value)) if self.ready else None,
4040             blob=self._value if self.ready else None,
4041             optional=self.optional,
4042             default=self == self.default,
4043             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4044             expl=None if self._expl is None else tag_decode(self._expl),
4045             offset=self.offset,
4046             tlen=self.tlen,
4047             llen=self.llen,
4048             vlen=self.vlen,
4049             expl_offset=self.expl_offset if self.expled else None,
4050             expl_tlen=self.expl_tlen if self.expled else None,
4051             expl_llen=self.expl_llen if self.expled else None,
4052             expl_vlen=self.expl_vlen if self.expled else None,
4053             expl_lenindef=self.expl_lenindef,
4054             lenindef=self.lenindef,
4055             ber_encoded=self.ber_encoded,
4056             bered=self.bered,
4057         )
4058         defined_by, defined = self.defined or (None, None)
4059         if defined_by is not None:
4060             yield defined.pps(
4061                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4062             )
4063         for pp in self.pps_lenindef(decode_path):
4064             yield pp
4065
4066
4067 def agg_octet_string(evgens, decode_path, raw, writer):
4068     """Aggregate constructed string (OctetString and its derivatives)
4069
4070     :param evgens: iterator of generated events
4071     :param decode_path: points to the string we want to decode
4072     :param raw: slicebable (memoryview, bytearray, etc) with
4073                 the data evgens are generated on
4074     :param writer: buffer.write where string is going to be saved
4075     :param writer: where string is going to be saved. Must comply
4076                    with ``io.RawIOBase.write`` behaviour
4077     """
4078     decode_path_len = len(decode_path)
4079     for dp, obj, _ in evgens:
4080         if dp[:decode_path_len] != decode_path:
4081             continue
4082         if not obj.ber_encoded:
4083             write_full(writer, raw[
4084                 obj.offset + obj.tlen + obj.llen:
4085                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4086                 (EOC_LEN if obj.expl_lenindef else 0)
4087             ])
4088         if len(dp) == decode_path_len:
4089             break
4090
4091
4092 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4093
4094
4095 class Null(Obj):
4096     """``NULL`` null object
4097
4098     >>> n = Null()
4099     NULL
4100     >>> n.ready
4101     True
4102     """
4103     __slots__ = ()
4104     tag_default = tag_encode(5)
4105     asn1_type_name = "NULL"
4106
4107     def __init__(
4108             self,
4109             value=None,  # unused, but Sequence passes it
4110             impl=None,
4111             expl=None,
4112             optional=False,
4113             _decoded=(0, 0, 0),
4114     ):
4115         """
4116         :param bytes impl: override default tag with ``IMPLICIT`` one
4117         :param bytes expl: override default tag with ``EXPLICIT`` one
4118         :param bool optional: is object ``OPTIONAL`` in sequence
4119         """
4120         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4121         self.default = None
4122
4123     @property
4124     def ready(self):
4125         return True
4126
4127     def __getstate__(self):
4128         return NullState(
4129             __version__,
4130             self.tag,
4131             self._tag_order,
4132             self._expl,
4133             self.default,
4134             self.optional,
4135             self.offset,
4136             self.llen,
4137             self.vlen,
4138             self.expl_lenindef,
4139             self.lenindef,
4140             self.ber_encoded,
4141         )
4142
4143     def __eq__(self, their):
4144         if not issubclass(their.__class__, Null):
4145             return False
4146         return (
4147             self.tag == their.tag and
4148             self._expl == their._expl
4149         )
4150
4151     def __call__(
4152             self,
4153             value=None,
4154             impl=None,
4155             expl=None,
4156             optional=None,
4157     ):
4158         return self.__class__(
4159             impl=self.tag if impl is None else impl,
4160             expl=self._expl if expl is None else expl,
4161             optional=self.optional if optional is None else optional,
4162         )
4163
4164     def _encode(self):
4165         return self.tag + LEN0
4166
4167     def _encode1st(self, state):
4168         return len(self.tag) + 1, state
4169
4170     def _encode2nd(self, writer, state_iter):
4171         write_full(writer, self.tag + LEN0)
4172
4173     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4174         try:
4175             t, _, lv = tag_strip(tlv)
4176         except DecodeError as err:
4177             raise err.__class__(
4178                 msg=err.msg,
4179                 klass=self.__class__,
4180                 decode_path=decode_path,
4181                 offset=offset,
4182             )
4183         if t != self.tag:
4184             raise TagMismatch(
4185                 klass=self.__class__,
4186                 decode_path=decode_path,
4187                 offset=offset,
4188             )
4189         if tag_only:  # pragma: no cover
4190             yield None
4191             return
4192         try:
4193             l, _, v = len_decode(lv)
4194         except DecodeError as err:
4195             raise err.__class__(
4196                 msg=err.msg,
4197                 klass=self.__class__,
4198                 decode_path=decode_path,
4199                 offset=offset,
4200             )
4201         if l != 0:
4202             raise InvalidLength(
4203                 "Null must have zero length",
4204                 klass=self.__class__,
4205                 decode_path=decode_path,
4206                 offset=offset,
4207             )
4208         obj = self.__class__(
4209             impl=self.tag,
4210             expl=self._expl,
4211             optional=self.optional,
4212             _decoded=(offset, 1, 0),
4213         )
4214         yield decode_path, obj, v
4215
4216     def __repr__(self):
4217         return pp_console_row(next(self.pps()))
4218
4219     def pps(self, decode_path=()):
4220         yield _pp(
4221             obj=self,
4222             asn1_type_name=self.asn1_type_name,
4223             obj_name=self.__class__.__name__,
4224             decode_path=decode_path,
4225             optional=self.optional,
4226             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4227             expl=None if self._expl is None else tag_decode(self._expl),
4228             offset=self.offset,
4229             tlen=self.tlen,
4230             llen=self.llen,
4231             vlen=self.vlen,
4232             expl_offset=self.expl_offset if self.expled else None,
4233             expl_tlen=self.expl_tlen if self.expled else None,
4234             expl_llen=self.expl_llen if self.expled else None,
4235             expl_vlen=self.expl_vlen if self.expled else None,
4236             expl_lenindef=self.expl_lenindef,
4237             bered=self.bered,
4238         )
4239         for pp in self.pps_lenindef(decode_path):
4240             yield pp
4241
4242
4243 ObjectIdentifierState = namedtuple(
4244     "ObjectIdentifierState",
4245     BasicState._fields + ("value", "defines"),
4246     **NAMEDTUPLE_KWARGS
4247 )
4248
4249
4250 class ObjectIdentifier(Obj):
4251     """``OBJECT IDENTIFIER`` OID type
4252
4253     >>> oid = ObjectIdentifier((1, 2, 3))
4254     OBJECT IDENTIFIER 1.2.3
4255     >>> oid == ObjectIdentifier("1.2.3")
4256     True
4257     >>> tuple(oid)
4258     (1, 2, 3)
4259     >>> str(oid)
4260     '1.2.3'
4261     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4262     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4263
4264     >>> str(ObjectIdentifier((3, 1)))
4265     Traceback (most recent call last):
4266     pyderasn.InvalidOID: unacceptable first arc value
4267     """
4268     __slots__ = ("defines",)
4269     tag_default = tag_encode(6)
4270     asn1_type_name = "OBJECT IDENTIFIER"
4271
4272     def __init__(
4273             self,
4274             value=None,
4275             defines=(),
4276             impl=None,
4277             expl=None,
4278             default=None,
4279             optional=False,
4280             _decoded=(0, 0, 0),
4281     ):
4282         """
4283         :param value: set the value. Either tuples of integers,
4284                       string of "."-concatenated integers, or
4285                       :py:class:`pyderasn.ObjectIdentifier` object
4286         :param defines: sequence of tuples. Each tuple has two elements.
4287                         First one is relative to current one decode
4288                         path, aiming to the field defined by that OID.
4289                         Read about relative path in
4290                         :py:func:`pyderasn.abs_decode_path`. Second
4291                         tuple element is ``{OID: pyderasn.Obj()}``
4292                         dictionary, mapping between current OID value
4293                         and structure applied to defined field.
4294                         :ref:`Read about DEFINED BY <definedby>`
4295         :param bytes impl: override default tag with ``IMPLICIT`` one
4296         :param bytes expl: override default tag with ``EXPLICIT`` one
4297         :param default: set default value. Type same as in ``value``
4298         :param bool optional: is object ``OPTIONAL`` in sequence
4299         """
4300         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4301         self._value = value
4302         if value is not None:
4303             self._value = self._value_sanitize(value)
4304         if default is not None:
4305             default = self._value_sanitize(default)
4306             self.default = self.__class__(
4307                 value=default,
4308                 impl=self.tag,
4309                 expl=self._expl,
4310             )
4311             if self._value is None:
4312                 self._value = default
4313         self.defines = defines
4314
4315     def __add__(self, their):
4316         if their.__class__ == tuple:
4317             return self.__class__(self._value + array("L", their))
4318         if isinstance(their, self.__class__):
4319             return self.__class__(self._value + their._value)
4320         raise InvalidValueType((self.__class__, tuple))
4321
4322     def _value_sanitize(self, value):
4323         if issubclass(value.__class__, ObjectIdentifier):
4324             return value._value
4325         if isinstance(value, string_types):
4326             try:
4327                 value = array("L", (pureint(arc) for arc in value.split(".")))
4328             except ValueError:
4329                 raise InvalidOID("unacceptable arcs values")
4330         if value.__class__ == tuple:
4331             try:
4332                 value = array("L", value)
4333             except OverflowError as err:
4334                 raise InvalidOID(repr(err))
4335         if value.__class__ is array:
4336             if len(value) < 2:
4337                 raise InvalidOID("less than 2 arcs")
4338             first_arc = value[0]
4339             if first_arc in (0, 1):
4340                 if not (0 <= value[1] <= 39):
4341                     raise InvalidOID("second arc is too wide")
4342             elif first_arc == 2:
4343                 pass
4344             else:
4345                 raise InvalidOID("unacceptable first arc value")
4346             if not all(arc >= 0 for arc in value):
4347                 raise InvalidOID("negative arc value")
4348             return value
4349         raise InvalidValueType((self.__class__, str, tuple))
4350
4351     @property
4352     def ready(self):
4353         return self._value is not None
4354
4355     def __getstate__(self):
4356         return ObjectIdentifierState(
4357             __version__,
4358             self.tag,
4359             self._tag_order,
4360             self._expl,
4361             self.default,
4362             self.optional,
4363             self.offset,
4364             self.llen,
4365             self.vlen,
4366             self.expl_lenindef,
4367             self.lenindef,
4368             self.ber_encoded,
4369             self._value,
4370             self.defines,
4371         )
4372
4373     def __setstate__(self, state):
4374         super(ObjectIdentifier, self).__setstate__(state)
4375         self._value = state.value
4376         self.defines = state.defines
4377
4378     def __iter__(self):
4379         self._assert_ready()
4380         return iter(self._value)
4381
4382     def __str__(self):
4383         return ".".join(str(arc) for arc in self._value or ())
4384
4385     def __hash__(self):
4386         self._assert_ready()
4387         return hash(b"".join((
4388             self.tag,
4389             bytes(self._expl or b""),
4390             str(self._value).encode("ascii"),
4391         )))
4392
4393     def __eq__(self, their):
4394         if their.__class__ == tuple:
4395             return self._value == array("L", their)
4396         if not issubclass(their.__class__, ObjectIdentifier):
4397             return False
4398         return (
4399             self.tag == their.tag and
4400             self._expl == their._expl and
4401             self._value == their._value
4402         )
4403
4404     def __lt__(self, their):
4405         return self._value < their._value
4406
4407     def __call__(
4408             self,
4409             value=None,
4410             defines=None,
4411             impl=None,
4412             expl=None,
4413             default=None,
4414             optional=None,
4415     ):
4416         return self.__class__(
4417             value=value,
4418             defines=self.defines if defines is None else defines,
4419             impl=self.tag if impl is None else impl,
4420             expl=self._expl if expl is None else expl,
4421             default=self.default if default is None else default,
4422             optional=self.optional if optional is None else optional,
4423         )
4424
4425     def _encode_octets(self):
4426         self._assert_ready()
4427         value = self._value
4428         first_value = value[1]
4429         first_arc = value[0]
4430         if first_arc == 0:
4431             pass
4432         elif first_arc == 1:
4433             first_value += 40
4434         elif first_arc == 2:
4435             first_value += 80
4436         else:  # pragma: no cover
4437             raise RuntimeError("invalid arc is stored")
4438         octets = [zero_ended_encode(first_value)]
4439         for arc in value[2:]:
4440             octets.append(zero_ended_encode(arc))
4441         return b"".join(octets)
4442
4443     def _encode(self):
4444         v = self._encode_octets()
4445         return b"".join((self.tag, len_encode(len(v)), v))
4446
4447     def _encode1st(self, state):
4448         l = len(self._encode_octets())
4449         return len(self.tag) + len_size(l) + l, state
4450
4451     def _encode2nd(self, writer, state_iter):
4452         write_full(writer, self._encode())
4453
4454     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4455         try:
4456             t, _, lv = tag_strip(tlv)
4457         except DecodeError as err:
4458             raise err.__class__(
4459                 msg=err.msg,
4460                 klass=self.__class__,
4461                 decode_path=decode_path,
4462                 offset=offset,
4463             )
4464         if t != self.tag:
4465             raise TagMismatch(
4466                 klass=self.__class__,
4467                 decode_path=decode_path,
4468                 offset=offset,
4469             )
4470         if tag_only:  # pragma: no cover
4471             yield None
4472             return
4473         try:
4474             l, llen, v = len_decode(lv)
4475         except DecodeError as err:
4476             raise err.__class__(
4477                 msg=err.msg,
4478                 klass=self.__class__,
4479                 decode_path=decode_path,
4480                 offset=offset,
4481             )
4482         if l > len(v):
4483             raise NotEnoughData(
4484                 "encoded length is longer than data",
4485                 klass=self.__class__,
4486                 decode_path=decode_path,
4487                 offset=offset,
4488             )
4489         if l == 0:
4490             raise NotEnoughData(
4491                 "zero length",
4492                 klass=self.__class__,
4493                 decode_path=decode_path,
4494                 offset=offset,
4495             )
4496         v, tail = v[:l], v[l:]
4497         arcs = array("L")
4498         ber_encoded = False
4499         while len(v) > 0:
4500             i = 0
4501             arc = 0
4502             while True:
4503                 octet = indexbytes(v, i)
4504                 if i == 0 and octet == 0x80:
4505                     if ctx.get("bered", False):
4506                         ber_encoded = True
4507                     else:
4508                         raise DecodeError(
4509                             "non normalized arc encoding",
4510                             klass=self.__class__,
4511                             decode_path=decode_path,
4512                             offset=offset,
4513                         )
4514                 arc = (arc << 7) | (octet & 0x7F)
4515                 if octet & 0x80 == 0:
4516                     try:
4517                         arcs.append(arc)
4518                     except OverflowError:
4519                         raise DecodeError(
4520                             "too huge value for local unsigned long",
4521                             klass=self.__class__,
4522                             decode_path=decode_path,
4523                             offset=offset,
4524                         )
4525                     v = v[i + 1:]
4526                     break
4527                 i += 1
4528                 if i == len(v):
4529                     raise DecodeError(
4530                         "unfinished OID",
4531                         klass=self.__class__,
4532                         decode_path=decode_path,
4533                         offset=offset,
4534                     )
4535         first_arc = 0
4536         second_arc = arcs[0]
4537         if 0 <= second_arc <= 39:
4538             first_arc = 0
4539         elif 40 <= second_arc <= 79:
4540             first_arc = 1
4541             second_arc -= 40
4542         else:
4543             first_arc = 2
4544             second_arc -= 80
4545         obj = self.__class__(
4546             value=array("L", (first_arc, second_arc)) + arcs[1:],
4547             impl=self.tag,
4548             expl=self._expl,
4549             default=self.default,
4550             optional=self.optional,
4551             _decoded=(offset, llen, l),
4552         )
4553         if ber_encoded:
4554             obj.ber_encoded = True
4555         yield decode_path, obj, tail
4556
4557     def __repr__(self):
4558         return pp_console_row(next(self.pps()))
4559
4560     def pps(self, decode_path=()):
4561         yield _pp(
4562             obj=self,
4563             asn1_type_name=self.asn1_type_name,
4564             obj_name=self.__class__.__name__,
4565             decode_path=decode_path,
4566             value=str(self) if self.ready else None,
4567             optional=self.optional,
4568             default=self == self.default,
4569             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4570             expl=None if self._expl is None else tag_decode(self._expl),
4571             offset=self.offset,
4572             tlen=self.tlen,
4573             llen=self.llen,
4574             vlen=self.vlen,
4575             expl_offset=self.expl_offset if self.expled else None,
4576             expl_tlen=self.expl_tlen if self.expled else None,
4577             expl_llen=self.expl_llen if self.expled else None,
4578             expl_vlen=self.expl_vlen if self.expled else None,
4579             expl_lenindef=self.expl_lenindef,
4580             ber_encoded=self.ber_encoded,
4581             bered=self.bered,
4582         )
4583         for pp in self.pps_lenindef(decode_path):
4584             yield pp
4585
4586
4587 class Enumerated(Integer):
4588     """``ENUMERATED`` integer type
4589
4590     This type is identical to :py:class:`pyderasn.Integer`, but requires
4591     schema to be specified and does not accept values missing from it.
4592     """
4593     __slots__ = ()
4594     tag_default = tag_encode(10)
4595     asn1_type_name = "ENUMERATED"
4596
4597     def __init__(
4598             self,
4599             value=None,
4600             impl=None,
4601             expl=None,
4602             default=None,
4603             optional=False,
4604             _specs=None,
4605             _decoded=(0, 0, 0),
4606             bounds=None,  # dummy argument, workability for Integer.decode
4607     ):
4608         super(Enumerated, self).__init__(
4609             value, bounds, impl, expl, default, optional, _specs, _decoded,
4610         )
4611         if len(self.specs) == 0:
4612             raise ValueError("schema must be specified")
4613
4614     def _value_sanitize(self, value):
4615         if isinstance(value, self.__class__):
4616             value = value._value
4617         elif isinstance(value, integer_types):
4618             for _value in itervalues(self.specs):
4619                 if _value == value:
4620                     break
4621             else:
4622                 raise DecodeError(
4623                     "unknown integer value: %s" % value,
4624                     klass=self.__class__,
4625                 )
4626         elif isinstance(value, string_types):
4627             value = self.specs.get(value)
4628             if value is None:
4629                 raise ObjUnknown("integer value: %s" % value)
4630         else:
4631             raise InvalidValueType((self.__class__, int, str))
4632         return value
4633
4634     def __call__(
4635             self,
4636             value=None,
4637             impl=None,
4638             expl=None,
4639             default=None,
4640             optional=None,
4641             _specs=None,
4642     ):
4643         return self.__class__(
4644             value=value,
4645             impl=self.tag if impl is None else impl,
4646             expl=self._expl if expl is None else expl,
4647             default=self.default if default is None else default,
4648             optional=self.optional if optional is None else optional,
4649             _specs=self.specs,
4650         )
4651
4652
4653 def escape_control_unicode(c):
4654     if unicat(c)[0] == "C":
4655         c = repr(c).lstrip("u").strip("'")
4656     return c
4657
4658
4659 class CommonString(OctetString):
4660     """Common class for all strings
4661
4662     Everything resembles :py:class:`pyderasn.OctetString`, except
4663     ability to deal with unicode text strings.
4664
4665     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4666     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4667     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4668     True
4669     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4670     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4671     >>> str(s)
4672     'привет Ð¼Ð¸Ñ€'
4673     >>> hexenc(bytes(s))
4674     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4675
4676     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4677     Traceback (most recent call last):
4678     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4679
4680     >>> BMPString("ада", bounds=(2, 2))
4681     Traceback (most recent call last):
4682     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4683     >>> s = BMPString("ад", bounds=(2, 2))
4684     >>> s.encoding
4685     'utf-16-be'
4686     >>> hexenc(bytes(s))
4687     '04300434'
4688
4689     .. list-table::
4690        :header-rows: 1
4691
4692        * - Class
4693          - Text Encoding
4694        * - :py:class:`pyderasn.UTF8String`
4695          - utf-8
4696        * - :py:class:`pyderasn.NumericString`
4697          - ascii
4698        * - :py:class:`pyderasn.PrintableString`
4699          - ascii
4700        * - :py:class:`pyderasn.TeletexString`
4701          - ascii
4702        * - :py:class:`pyderasn.T61String`
4703          - ascii
4704        * - :py:class:`pyderasn.VideotexString`
4705          - iso-8859-1
4706        * - :py:class:`pyderasn.IA5String`
4707          - ascii
4708        * - :py:class:`pyderasn.GraphicString`
4709          - iso-8859-1
4710        * - :py:class:`pyderasn.VisibleString`
4711          - ascii
4712        * - :py:class:`pyderasn.ISO646String`
4713          - ascii
4714        * - :py:class:`pyderasn.GeneralString`
4715          - iso-8859-1
4716        * - :py:class:`pyderasn.UniversalString`
4717          - utf-32-be
4718        * - :py:class:`pyderasn.BMPString`
4719          - utf-16-be
4720     """
4721     __slots__ = ()
4722
4723     def _value_sanitize(self, value):
4724         value_raw = None
4725         value_decoded = None
4726         if isinstance(value, self.__class__):
4727             value_raw = value._value
4728         elif value.__class__ == text_type:
4729             value_decoded = value
4730         elif value.__class__ == binary_type:
4731             value_raw = value
4732         else:
4733             raise InvalidValueType((self.__class__, text_type, binary_type))
4734         try:
4735             value_raw = (
4736                 value_decoded.encode(self.encoding)
4737                 if value_raw is None else value_raw
4738             )
4739             value_decoded = (
4740                 value_raw.decode(self.encoding)
4741                 if value_decoded is None else value_decoded
4742             )
4743         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4744             raise DecodeError(str(err))
4745         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4746             raise BoundsError(
4747                 self._bound_min,
4748                 len(value_decoded),
4749                 self._bound_max,
4750             )
4751         return value_raw
4752
4753     def __eq__(self, their):
4754         if their.__class__ == binary_type:
4755             return self._value == their
4756         if their.__class__ == text_type:
4757             return self._value == their.encode(self.encoding)
4758         if not isinstance(their, self.__class__):
4759             return False
4760         return (
4761             self._value == their._value and
4762             self.tag == their.tag and
4763             self._expl == their._expl
4764         )
4765
4766     def __unicode__(self):
4767         if self.ready:
4768             return self._value.decode(self.encoding)
4769         return text_type(self._value)
4770
4771     def __repr__(self):
4772         return pp_console_row(next(self.pps(no_unicode=PY2)))
4773
4774     def pps(self, decode_path=(), no_unicode=False):
4775         value = None
4776         if self.ready:
4777             value = (
4778                 hexenc(bytes(self)) if no_unicode else
4779                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4780             )
4781         yield _pp(
4782             obj=self,
4783             asn1_type_name=self.asn1_type_name,
4784             obj_name=self.__class__.__name__,
4785             decode_path=decode_path,
4786             value=value,
4787             optional=self.optional,
4788             default=self == self.default,
4789             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4790             expl=None if self._expl is None else tag_decode(self._expl),
4791             offset=self.offset,
4792             tlen=self.tlen,
4793             llen=self.llen,
4794             vlen=self.vlen,
4795             expl_offset=self.expl_offset if self.expled else None,
4796             expl_tlen=self.expl_tlen if self.expled else None,
4797             expl_llen=self.expl_llen if self.expled else None,
4798             expl_vlen=self.expl_vlen if self.expled else None,
4799             expl_lenindef=self.expl_lenindef,
4800             ber_encoded=self.ber_encoded,
4801             bered=self.bered,
4802         )
4803         for pp in self.pps_lenindef(decode_path):
4804             yield pp
4805
4806
4807 class UTF8String(CommonString):
4808     __slots__ = ()
4809     tag_default = tag_encode(12)
4810     encoding = "utf-8"
4811     asn1_type_name = "UTF8String"
4812
4813
4814 class AllowableCharsMixin(object):
4815     @property
4816     def allowable_chars(self):
4817         if PY2:
4818             return self._allowable_chars
4819         return frozenset(six_unichr(c) for c in self._allowable_chars)
4820
4821
4822 class NumericString(AllowableCharsMixin, CommonString):
4823     """Numeric string
4824
4825     Its value is properly sanitized: only ASCII digits with spaces can
4826     be stored.
4827
4828     >>> NumericString().allowable_chars
4829     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4830     """
4831     __slots__ = ()
4832     tag_default = tag_encode(18)
4833     encoding = "ascii"
4834     asn1_type_name = "NumericString"
4835     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4836
4837     def _value_sanitize(self, value):
4838         value = super(NumericString, self)._value_sanitize(value)
4839         if not frozenset(value) <= self._allowable_chars:
4840             raise DecodeError("non-numeric value")
4841         return value
4842
4843
4844 PrintableStringState = namedtuple(
4845     "PrintableStringState",
4846     OctetStringState._fields + ("allowable_chars",),
4847     **NAMEDTUPLE_KWARGS
4848 )
4849
4850
4851 class PrintableString(AllowableCharsMixin, CommonString):
4852     """Printable string
4853
4854     Its value is properly sanitized: see X.680 41.4 table 10.
4855
4856     >>> PrintableString().allowable_chars
4857     frozenset([' ', "'", ..., 'z'])
4858     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4859     PrintableString PrintableString foo*bar
4860     >>> obj.allow_asterisk, obj.allow_ampersand
4861     (True, False)
4862     """
4863     __slots__ = ()
4864     tag_default = tag_encode(19)
4865     encoding = "ascii"
4866     asn1_type_name = "PrintableString"
4867     _allowable_chars = frozenset(
4868         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4869     )
4870     _asterisk = frozenset("*".encode("ascii"))
4871     _ampersand = frozenset("&".encode("ascii"))
4872
4873     def __init__(
4874             self,
4875             value=None,
4876             bounds=None,
4877             impl=None,
4878             expl=None,
4879             default=None,
4880             optional=False,
4881             _decoded=(0, 0, 0),
4882             ctx=None,
4883             allow_asterisk=False,
4884             allow_ampersand=False,
4885     ):
4886         """
4887         :param allow_asterisk: allow asterisk character
4888         :param allow_ampersand: allow ampersand character
4889         """
4890         if allow_asterisk:
4891             self._allowable_chars |= self._asterisk
4892         if allow_ampersand:
4893             self._allowable_chars |= self._ampersand
4894         super(PrintableString, self).__init__(
4895             value, bounds, impl, expl, default, optional, _decoded, ctx,
4896         )
4897
4898     @property
4899     def allow_asterisk(self):
4900         """Is asterisk character allowed?
4901         """
4902         return self._asterisk <= self._allowable_chars
4903
4904     @property
4905     def allow_ampersand(self):
4906         """Is ampersand character allowed?
4907         """
4908         return self._ampersand <= self._allowable_chars
4909
4910     def _value_sanitize(self, value):
4911         value = super(PrintableString, self)._value_sanitize(value)
4912         if not frozenset(value) <= self._allowable_chars:
4913             raise DecodeError("non-printable value")
4914         return value
4915
4916     def __getstate__(self):
4917         return PrintableStringState(
4918             *super(PrintableString, self).__getstate__(),
4919             **{"allowable_chars": self._allowable_chars}
4920         )
4921
4922     def __setstate__(self, state):
4923         super(PrintableString, self).__setstate__(state)
4924         self._allowable_chars = state.allowable_chars
4925
4926     def __call__(
4927             self,
4928             value=None,
4929             bounds=None,
4930             impl=None,
4931             expl=None,
4932             default=None,
4933             optional=None,
4934     ):
4935         return self.__class__(
4936             value=value,
4937             bounds=(
4938                 (self._bound_min, self._bound_max)
4939                 if bounds is None else bounds
4940             ),
4941             impl=self.tag if impl is None else impl,
4942             expl=self._expl if expl is None else expl,
4943             default=self.default if default is None else default,
4944             optional=self.optional if optional is None else optional,
4945             allow_asterisk=self.allow_asterisk,
4946             allow_ampersand=self.allow_ampersand,
4947         )
4948
4949
4950 class TeletexString(CommonString):
4951     __slots__ = ()
4952     tag_default = tag_encode(20)
4953     encoding = "ascii"
4954     asn1_type_name = "TeletexString"
4955
4956
4957 class T61String(TeletexString):
4958     __slots__ = ()
4959     asn1_type_name = "T61String"
4960
4961
4962 class VideotexString(CommonString):
4963     __slots__ = ()
4964     tag_default = tag_encode(21)
4965     encoding = "iso-8859-1"
4966     asn1_type_name = "VideotexString"
4967
4968
4969 class IA5String(CommonString):
4970     __slots__ = ()
4971     tag_default = tag_encode(22)
4972     encoding = "ascii"
4973     asn1_type_name = "IA5"
4974
4975
4976 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4977 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4978 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4979 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4980 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4981 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4982
4983
4984 class VisibleString(CommonString):
4985     __slots__ = ()
4986     tag_default = tag_encode(26)
4987     encoding = "ascii"
4988     asn1_type_name = "VisibleString"
4989
4990
4991 UTCTimeState = namedtuple(
4992     "UTCTimeState",
4993     OctetStringState._fields + ("ber_raw",),
4994     **NAMEDTUPLE_KWARGS
4995 )
4996
4997
4998 def str_to_time_fractions(value):
4999     v = pureint(value)
5000     year, v = (v // 10**10), (v % 10**10)
5001     month, v = (v // 10**8), (v % 10**8)
5002     day, v = (v // 10**6), (v % 10**6)
5003     hour, v = (v // 10**4), (v % 10**4)
5004     minute, second = (v // 100), (v % 100)
5005     return year, month, day, hour, minute, second
5006
5007
5008 class UTCTime(VisibleString):
5009     """``UTCTime`` datetime type
5010
5011     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5012     UTCTime UTCTime 2017-09-30T22:07:50
5013     >>> str(t)
5014     '170930220750Z'
5015     >>> bytes(t)
5016     b'170930220750Z'
5017     >>> t.todatetime()
5018     datetime.datetime(2017, 9, 30, 22, 7, 50)
5019     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5020     datetime.datetime(1957, 9, 30, 22, 7, 50)
5021
5022     If BER encoded value was met, then ``ber_raw`` attribute will hold
5023     its raw representation.
5024
5025     .. warning::
5026
5027        Pay attention that UTCTime can not hold full year, so all years
5028        having < 50 years are treated as 20xx, 19xx otherwise, according
5029        to X.509 recommendation.
5030
5031     .. warning::
5032
5033        No strict validation of UTC offsets are made, but very crude:
5034
5035        * minutes are not exceeding 60
5036        * offset value is not exceeding 14 hours
5037     """
5038     __slots__ = ("ber_raw",)
5039     tag_default = tag_encode(23)
5040     encoding = "ascii"
5041     asn1_type_name = "UTCTime"
5042     evgen_mode_skip_value = False
5043
5044     def __init__(
5045             self,
5046             value=None,
5047             impl=None,
5048             expl=None,
5049             default=None,
5050             optional=False,
5051             _decoded=(0, 0, 0),
5052             bounds=None,  # dummy argument, workability for OctetString.decode
5053             ctx=None,
5054     ):
5055         """
5056         :param value: set the value. Either datetime type, or
5057                       :py:class:`pyderasn.UTCTime` object
5058         :param bytes impl: override default tag with ``IMPLICIT`` one
5059         :param bytes expl: override default tag with ``EXPLICIT`` one
5060         :param default: set default value. Type same as in ``value``
5061         :param bool optional: is object ``OPTIONAL`` in sequence
5062         """
5063         super(UTCTime, self).__init__(
5064             None, None, impl, expl, None, optional, _decoded, ctx,
5065         )
5066         self._value = value
5067         self.ber_raw = None
5068         if value is not None:
5069             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5070             self.ber_encoded = self.ber_raw is not None
5071         if default is not None:
5072             default, _ = self._value_sanitize(default)
5073             self.default = self.__class__(
5074                 value=default,
5075                 impl=self.tag,
5076                 expl=self._expl,
5077             )
5078             if self._value is None:
5079                 self._value = default
5080             optional = True
5081         self.optional = optional
5082
5083     def _strptime_bered(self, value):
5084         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5085         value = value[10:]
5086         if len(value) == 0:
5087             raise ValueError("no timezone")
5088         year += 2000 if year < 50 else 1900
5089         decoded = datetime(year, month, day, hour, minute)
5090         offset = 0
5091         if value[-1] == "Z":
5092             value = value[:-1]
5093         else:
5094             if len(value) < 5:
5095                 raise ValueError("invalid UTC offset")
5096             if value[-5] == "-":
5097                 sign = -1
5098             elif value[-5] == "+":
5099                 sign = 1
5100             else:
5101                 raise ValueError("invalid UTC offset")
5102             v = pureint(value[-4:])
5103             offset, v = (60 * (v % 100)), v // 100
5104             if offset >= 3600:
5105                 raise ValueError("invalid UTC offset minutes")
5106             offset += 3600 * v
5107             if offset > 14 * 3600:
5108                 raise ValueError("too big UTC offset")
5109             offset *= sign
5110             value = value[:-5]
5111         if len(value) == 0:
5112             return offset, decoded
5113         if len(value) != 2:
5114             raise ValueError("invalid UTC offset seconds")
5115         seconds = pureint(value)
5116         if seconds >= 60:
5117             raise ValueError("invalid seconds value")
5118         return offset, decoded + timedelta(seconds=seconds)
5119
5120     def _strptime(self, value):
5121         # datetime.strptime's format: %y%m%d%H%M%SZ
5122         if len(value) != LEN_YYMMDDHHMMSSZ:
5123             raise ValueError("invalid UTCTime length")
5124         if value[-1] != "Z":
5125             raise ValueError("non UTC timezone")
5126         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5127         year += 2000 if year < 50 else 1900
5128         return datetime(year, month, day, hour, minute, second)
5129
5130     def _dt_sanitize(self, value):
5131         if value.year < 1950 or value.year > 2049:
5132             raise ValueError("UTCTime can hold only 1950-2049 years")
5133         return value.replace(microsecond=0)
5134
5135     def _value_sanitize(self, value, ctx=None):
5136         if value.__class__ == binary_type:
5137             try:
5138                 value_decoded = value.decode("ascii")
5139             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5140                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5141             err = None
5142             try:
5143                 return self._strptime(value_decoded), None
5144             except (TypeError, ValueError) as _err:
5145                 err = _err
5146                 if (ctx is not None) and ctx.get("bered", False):
5147                     try:
5148                         offset, _value = self._strptime_bered(value_decoded)
5149                         _value = _value - timedelta(seconds=offset)
5150                         return self._dt_sanitize(_value), value
5151                     except (TypeError, ValueError, OverflowError) as _err:
5152                         err = _err
5153             raise DecodeError(
5154                 "invalid %s format: %r" % (self.asn1_type_name, err),
5155                 klass=self.__class__,
5156             )
5157         if isinstance(value, self.__class__):
5158             return value._value, None
5159         if value.__class__ == datetime:
5160             return self._dt_sanitize(value), None
5161         raise InvalidValueType((self.__class__, datetime))
5162
5163     def _pp_value(self):
5164         if self.ready:
5165             value = self._value.isoformat()
5166             if self.ber_encoded:
5167                 value += " (%s)" % self.ber_raw
5168             return value
5169         return None
5170
5171     def __unicode__(self):
5172         if self.ready:
5173             value = self._value.isoformat()
5174             if self.ber_encoded:
5175                 value += " (%s)" % self.ber_raw
5176             return value
5177         return text_type(self._pp_value())
5178
5179     def __getstate__(self):
5180         return UTCTimeState(
5181             *super(UTCTime, self).__getstate__(),
5182             **{"ber_raw": self.ber_raw}
5183         )
5184
5185     def __setstate__(self, state):
5186         super(UTCTime, self).__setstate__(state)
5187         self.ber_raw = state.ber_raw
5188
5189     def __bytes__(self):
5190         self._assert_ready()
5191         return self._encode_time()
5192
5193     def __eq__(self, their):
5194         if their.__class__ == binary_type:
5195             return self._encode_time() == their
5196         if their.__class__ == datetime:
5197             return self.todatetime() == their
5198         if not isinstance(their, self.__class__):
5199             return False
5200         return (
5201             self._value == their._value and
5202             self.tag == their.tag and
5203             self._expl == their._expl
5204         )
5205
5206     def _encode_time(self):
5207         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5208
5209     def _encode(self):
5210         self._assert_ready()
5211         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5212
5213     def _encode1st(self, state):
5214         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5215
5216     def _encode2nd(self, writer, state_iter):
5217         self._assert_ready()
5218         write_full(writer, self._encode())
5219
5220     def _encode_cer(self, writer):
5221         write_full(writer, self._encode())
5222
5223     def todatetime(self):
5224         return self._value
5225
5226     def __repr__(self):
5227         return pp_console_row(next(self.pps()))
5228
5229     def pps(self, decode_path=()):
5230         yield _pp(
5231             obj=self,
5232             asn1_type_name=self.asn1_type_name,
5233             obj_name=self.__class__.__name__,
5234             decode_path=decode_path,
5235             value=self._pp_value(),
5236             optional=self.optional,
5237             default=self == self.default,
5238             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5239             expl=None if self._expl is None else tag_decode(self._expl),
5240             offset=self.offset,
5241             tlen=self.tlen,
5242             llen=self.llen,
5243             vlen=self.vlen,
5244             expl_offset=self.expl_offset if self.expled else None,
5245             expl_tlen=self.expl_tlen if self.expled else None,
5246             expl_llen=self.expl_llen if self.expled else None,
5247             expl_vlen=self.expl_vlen if self.expled else None,
5248             expl_lenindef=self.expl_lenindef,
5249             ber_encoded=self.ber_encoded,
5250             bered=self.bered,
5251         )
5252         for pp in self.pps_lenindef(decode_path):
5253             yield pp
5254
5255
5256 class GeneralizedTime(UTCTime):
5257     """``GeneralizedTime`` datetime type
5258
5259     This type is similar to :py:class:`pyderasn.UTCTime`.
5260
5261     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5262     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5263     >>> str(t)
5264     '20170930220750.000123Z'
5265     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5266     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5267
5268     .. warning::
5269
5270        Only microsecond fractions are supported in DER encoding.
5271        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5272        higher precision values.
5273
5274     .. warning::
5275
5276        BER encoded data can loss information (accuracy) during decoding
5277        because of float transformations.
5278
5279     .. warning::
5280
5281        Local times (without explicit timezone specification) are treated
5282        as UTC one, no transformations are made.
5283
5284     .. warning::
5285
5286        Zero year is unsupported.
5287     """
5288     __slots__ = ()
5289     tag_default = tag_encode(24)
5290     asn1_type_name = "GeneralizedTime"
5291
5292     def _dt_sanitize(self, value):
5293         return value
5294
5295     def _strptime_bered(self, value):
5296         if len(value) < 4 + 3 * 2:
5297             raise ValueError("invalid GeneralizedTime")
5298         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5299         decoded = datetime(year, month, day, hour)
5300         offset, value = 0, value[10:]
5301         if len(value) == 0:
5302             return offset, decoded
5303         if value[-1] == "Z":
5304             value = value[:-1]
5305         else:
5306             for char, sign in (("-", -1), ("+", 1)):
5307                 idx = value.rfind(char)
5308                 if idx == -1:
5309                     continue
5310                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5311                 v = pureint(offset_raw)
5312                 if len(offset_raw) == 4:
5313                     offset, v = (60 * (v % 100)), v // 100
5314                     if offset >= 3600:
5315                         raise ValueError("invalid UTC offset minutes")
5316                 elif len(offset_raw) == 2:
5317                     pass
5318                 else:
5319                     raise ValueError("invalid UTC offset")
5320                 offset += 3600 * v
5321                 if offset > 14 * 3600:
5322                     raise ValueError("too big UTC offset")
5323                 offset *= sign
5324                 break
5325         if len(value) == 0:
5326             return offset, decoded
5327         if value[0] in DECIMAL_SIGNS:
5328             return offset, (
5329                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5330             )
5331         if len(value) < 2:
5332             raise ValueError("stripped minutes")
5333         decoded += timedelta(seconds=60 * pureint(value[:2]))
5334         value = value[2:]
5335         if len(value) == 0:
5336             return offset, decoded
5337         if value[0] in DECIMAL_SIGNS:
5338             return offset, (
5339                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5340             )
5341         if len(value) < 2:
5342             raise ValueError("stripped seconds")
5343         decoded += timedelta(seconds=pureint(value[:2]))
5344         value = value[2:]
5345         if len(value) == 0:
5346             return offset, decoded
5347         if value[0] not in DECIMAL_SIGNS:
5348             raise ValueError("invalid format after seconds")
5349         return offset, (
5350             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5351         )
5352
5353     def _strptime(self, value):
5354         l = len(value)
5355         if l == LEN_YYYYMMDDHHMMSSZ:
5356             # datetime.strptime's format: %Y%m%d%H%M%SZ
5357             if value[-1] != "Z":
5358                 raise ValueError("non UTC timezone")
5359             return datetime(*str_to_time_fractions(value[:-1]))
5360         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5361             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5362             if value[-1] != "Z":
5363                 raise ValueError("non UTC timezone")
5364             if value[14] != ".":
5365                 raise ValueError("no fractions separator")
5366             us = value[15:-1]
5367             if us[-1] == "0":
5368                 raise ValueError("trailing zero")
5369             us_len = len(us)
5370             if us_len > 6:
5371                 raise ValueError("only microsecond fractions are supported")
5372             us = pureint(us + ("0" * (6 - us_len)))
5373             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5374             return datetime(year, month, day, hour, minute, second, us)
5375         raise ValueError("invalid GeneralizedTime length")
5376
5377     def _encode_time(self):
5378         value = self._value
5379         encoded = value.strftime("%Y%m%d%H%M%S")
5380         if value.microsecond > 0:
5381             encoded += (".%06d" % value.microsecond).rstrip("0")
5382         return (encoded + "Z").encode("ascii")
5383
5384     def _encode(self):
5385         self._assert_ready()
5386         value = self._value
5387         if value.microsecond > 0:
5388             encoded = self._encode_time()
5389             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5390         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5391
5392     def _encode1st(self, state):
5393         self._assert_ready()
5394         vlen = len(self._encode_time())
5395         return len(self.tag) + len_size(vlen) + vlen, state
5396
5397     def _encode2nd(self, writer, state_iter):
5398         write_full(writer, self._encode())
5399
5400
5401 class GraphicString(CommonString):
5402     __slots__ = ()
5403     tag_default = tag_encode(25)
5404     encoding = "iso-8859-1"
5405     asn1_type_name = "GraphicString"
5406
5407
5408 class ISO646String(VisibleString):
5409     __slots__ = ()
5410     asn1_type_name = "ISO646String"
5411
5412
5413 class GeneralString(CommonString):
5414     __slots__ = ()
5415     tag_default = tag_encode(27)
5416     encoding = "iso-8859-1"
5417     asn1_type_name = "GeneralString"
5418
5419
5420 class UniversalString(CommonString):
5421     __slots__ = ()
5422     tag_default = tag_encode(28)
5423     encoding = "utf-32-be"
5424     asn1_type_name = "UniversalString"
5425
5426
5427 class BMPString(CommonString):
5428     __slots__ = ()
5429     tag_default = tag_encode(30)
5430     encoding = "utf-16-be"
5431     asn1_type_name = "BMPString"
5432
5433
5434 ChoiceState = namedtuple(
5435     "ChoiceState",
5436     BasicState._fields + ("specs", "value",),
5437     **NAMEDTUPLE_KWARGS
5438 )
5439
5440
5441 class Choice(Obj):
5442     """``CHOICE`` special type
5443
5444     ::
5445
5446         class GeneralName(Choice):
5447             schema = (
5448                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5449                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5450             )
5451
5452     >>> gn = GeneralName()
5453     GeneralName CHOICE
5454     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5455     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5456     >>> gn["dNSName"] = IA5String("bar.baz")
5457     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5458     >>> gn["rfc822Name"]
5459     None
5460     >>> gn["dNSName"]
5461     [2] IA5String IA5 bar.baz
5462     >>> gn.choice
5463     'dNSName'
5464     >>> gn.value == gn["dNSName"]
5465     True
5466     >>> gn.specs
5467     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5468
5469     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5470     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5471     """
5472     __slots__ = ("specs",)
5473     tag_default = None
5474     asn1_type_name = "CHOICE"
5475
5476     def __init__(
5477             self,
5478             value=None,
5479             schema=None,
5480             impl=None,
5481             expl=None,
5482             default=None,
5483             optional=False,
5484             _decoded=(0, 0, 0),
5485     ):
5486         """
5487         :param value: set the value. Either ``(choice, value)`` tuple, or
5488                       :py:class:`pyderasn.Choice` object
5489         :param bytes impl: can not be set, do **not** use it
5490         :param bytes expl: override default tag with ``EXPLICIT`` one
5491         :param default: set default value. Type same as in ``value``
5492         :param bool optional: is object ``OPTIONAL`` in sequence
5493         """
5494         if impl is not None:
5495             raise ValueError("no implicit tag allowed for CHOICE")
5496         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5497         if schema is None:
5498             schema = getattr(self, "schema", ())
5499         if len(schema) == 0:
5500             raise ValueError("schema must be specified")
5501         self.specs = (
5502             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5503         )
5504         self._value = None
5505         if value is not None:
5506             self._value = self._value_sanitize(value)
5507         if default is not None:
5508             default_value = self._value_sanitize(default)
5509             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5510             default_obj.specs = self.specs
5511             default_obj._value = default_value
5512             self.default = default_obj
5513             if value is None:
5514                 self._value = copy(default_obj._value)
5515         if self._expl is not None:
5516             tag_class, _, tag_num = tag_decode(self._expl)
5517             self._tag_order = (tag_class, tag_num)
5518
5519     def _value_sanitize(self, value):
5520         if (value.__class__ == tuple) and len(value) == 2:
5521             choice, obj = value
5522             spec = self.specs.get(choice)
5523             if spec is None:
5524                 raise ObjUnknown(choice)
5525             if not isinstance(obj, spec.__class__):
5526                 raise InvalidValueType((spec,))
5527             return (choice, spec(obj))
5528         if isinstance(value, self.__class__):
5529             return value._value
5530         raise InvalidValueType((self.__class__, tuple))
5531
5532     @property
5533     def ready(self):
5534         return self._value is not None and self._value[1].ready
5535
5536     @property
5537     def bered(self):
5538         return self.expl_lenindef or (
5539             (self._value is not None) and
5540             self._value[1].bered
5541         )
5542
5543     def __getstate__(self):
5544         return ChoiceState(
5545             __version__,
5546             self.tag,
5547             self._tag_order,
5548             self._expl,
5549             self.default,
5550             self.optional,
5551             self.offset,
5552             self.llen,
5553             self.vlen,
5554             self.expl_lenindef,
5555             self.lenindef,
5556             self.ber_encoded,
5557             self.specs,
5558             copy(self._value),
5559         )
5560
5561     def __setstate__(self, state):
5562         super(Choice, self).__setstate__(state)
5563         self.specs = state.specs
5564         self._value = state.value
5565
5566     def __eq__(self, their):
5567         if (their.__class__ == tuple) and len(their) == 2:
5568             return self._value == their
5569         if not isinstance(their, self.__class__):
5570             return False
5571         return (
5572             self.specs == their.specs and
5573             self._value == their._value
5574         )
5575
5576     def __call__(
5577             self,
5578             value=None,
5579             expl=None,
5580             default=None,
5581             optional=None,
5582     ):
5583         return self.__class__(
5584             value=value,
5585             schema=self.specs,
5586             expl=self._expl if expl is None else expl,
5587             default=self.default if default is None else default,
5588             optional=self.optional if optional is None else optional,
5589         )
5590
5591     @property
5592     def choice(self):
5593         """Name of the choice
5594         """
5595         self._assert_ready()
5596         return self._value[0]
5597
5598     @property
5599     def value(self):
5600         """Value of underlying choice
5601         """
5602         self._assert_ready()
5603         return self._value[1]
5604
5605     @property
5606     def tag_order(self):
5607         self._assert_ready()
5608         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5609
5610     @property
5611     def tag_order_cer(self):
5612         return min(v.tag_order_cer for v in itervalues(self.specs))
5613
5614     def __getitem__(self, key):
5615         if key not in self.specs:
5616             raise ObjUnknown(key)
5617         if self._value is None:
5618             return None
5619         choice, value = self._value
5620         if choice != key:
5621             return None
5622         return value
5623
5624     def __setitem__(self, key, value):
5625         spec = self.specs.get(key)
5626         if spec is None:
5627             raise ObjUnknown(key)
5628         if not isinstance(value, spec.__class__):
5629             raise InvalidValueType((spec.__class__,))
5630         self._value = (key, spec(value))
5631
5632     @property
5633     def tlen(self):
5634         return 0
5635
5636     @property
5637     def decoded(self):
5638         return self._value[1].decoded if self.ready else False
5639
5640     def _encode(self):
5641         self._assert_ready()
5642         return self._value[1].encode()
5643
5644     def _encode1st(self, state):
5645         self._assert_ready()
5646         return self._value[1].encode1st(state)
5647
5648     def _encode2nd(self, writer, state_iter):
5649         self._value[1].encode2nd(writer, state_iter)
5650
5651     def _encode_cer(self, writer):
5652         self._assert_ready()
5653         self._value[1].encode_cer(writer)
5654
5655     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5656         for choice, spec in iteritems(self.specs):
5657             sub_decode_path = decode_path + (choice,)
5658             try:
5659                 spec.decode(
5660                     tlv,
5661                     offset=offset,
5662                     leavemm=True,
5663                     decode_path=sub_decode_path,
5664                     ctx=ctx,
5665                     tag_only=True,
5666                     _ctx_immutable=False,
5667                 )
5668             except TagMismatch:
5669                 continue
5670             break
5671         else:
5672             raise TagMismatch(
5673                 klass=self.__class__,
5674                 decode_path=decode_path,
5675                 offset=offset,
5676             )
5677         if tag_only:  # pragma: no cover
5678             yield None
5679             return
5680         if evgen_mode:
5681             for _decode_path, value, tail in spec.decode_evgen(
5682                     tlv,
5683                     offset=offset,
5684                     leavemm=True,
5685                     decode_path=sub_decode_path,
5686                     ctx=ctx,
5687                     _ctx_immutable=False,
5688             ):
5689                 yield _decode_path, value, tail
5690         else:
5691             _, value, tail = next(spec.decode_evgen(
5692                 tlv,
5693                 offset=offset,
5694                 leavemm=True,
5695                 decode_path=sub_decode_path,
5696                 ctx=ctx,
5697                 _ctx_immutable=False,
5698                 _evgen_mode=False,
5699             ))
5700         obj = self.__class__(
5701             schema=self.specs,
5702             expl=self._expl,
5703             default=self.default,
5704             optional=self.optional,
5705             _decoded=(offset, 0, value.fulllen),
5706         )
5707         obj._value = (choice, value)
5708         yield decode_path, obj, tail
5709
5710     def __repr__(self):
5711         value = pp_console_row(next(self.pps()))
5712         if self.ready:
5713             value = "%s[%r]" % (value, self.value)
5714         return value
5715
5716     def pps(self, decode_path=()):
5717         yield _pp(
5718             obj=self,
5719             asn1_type_name=self.asn1_type_name,
5720             obj_name=self.__class__.__name__,
5721             decode_path=decode_path,
5722             value=self.choice if self.ready else None,
5723             optional=self.optional,
5724             default=self == self.default,
5725             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5726             expl=None if self._expl is None else tag_decode(self._expl),
5727             offset=self.offset,
5728             tlen=self.tlen,
5729             llen=self.llen,
5730             vlen=self.vlen,
5731             expl_lenindef=self.expl_lenindef,
5732             bered=self.bered,
5733         )
5734         if self.ready:
5735             yield self.value.pps(decode_path=decode_path + (self.choice,))
5736         for pp in self.pps_lenindef(decode_path):
5737             yield pp
5738
5739
5740 class PrimitiveTypes(Choice):
5741     """Predefined ``CHOICE`` for all generic primitive types
5742
5743     It could be useful for general decoding of some unspecified values:
5744
5745     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5746     OCTET STRING 3 bytes 666f6f
5747     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5748     INTEGER 1193046
5749     """
5750     __slots__ = ()
5751     schema = tuple((klass.__name__, klass()) for klass in (
5752         Boolean,
5753         Integer,
5754         BitString,
5755         OctetString,
5756         Null,
5757         ObjectIdentifier,
5758         UTF8String,
5759         NumericString,
5760         PrintableString,
5761         TeletexString,
5762         VideotexString,
5763         IA5String,
5764         UTCTime,
5765         GeneralizedTime,
5766         GraphicString,
5767         VisibleString,
5768         ISO646String,
5769         GeneralString,
5770         UniversalString,
5771         BMPString,
5772     ))
5773
5774
5775 AnyState = namedtuple(
5776     "AnyState",
5777     BasicState._fields + ("value", "defined"),
5778     **NAMEDTUPLE_KWARGS
5779 )
5780
5781
5782 class Any(Obj):
5783     """``ANY`` special type
5784
5785     >>> Any(Integer(-123))
5786     ANY INTEGER -123 (0X:7B)
5787     >>> a = Any(OctetString(b"hello world").encode())
5788     ANY 040b68656c6c6f20776f726c64
5789     >>> hexenc(bytes(a))
5790     b'0x040x0bhello world'
5791     """
5792     __slots__ = ("defined",)
5793     tag_default = tag_encode(0)
5794     asn1_type_name = "ANY"
5795
5796     def __init__(
5797             self,
5798             value=None,
5799             expl=None,
5800             optional=False,
5801             _decoded=(0, 0, 0),
5802     ):
5803         """
5804         :param value: set the value. Either any kind of pyderasn's
5805                       **ready** object, or bytes. Pay attention that
5806                       **no** validation is performed if raw binary value
5807                       is valid TLV, except just tag decoding
5808         :param bytes expl: override default tag with ``EXPLICIT`` one
5809         :param bool optional: is object ``OPTIONAL`` in sequence
5810         """
5811         super(Any, self).__init__(None, expl, None, optional, _decoded)
5812         if value is None:
5813             self._value = None
5814         else:
5815             value = self._value_sanitize(value)
5816             self._value = value
5817             if self._expl is None:
5818                 if value.__class__ == binary_type:
5819                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5820                 else:
5821                     tag_class, tag_num = value.tag_order
5822             else:
5823                 tag_class, _, tag_num = tag_decode(self._expl)
5824             self._tag_order = (tag_class, tag_num)
5825         self.defined = None
5826
5827     def _value_sanitize(self, value):
5828         if value.__class__ == binary_type:
5829             if len(value) == 0:
5830                 raise ValueError("Any value can not be empty")
5831             return value
5832         if isinstance(value, self.__class__):
5833             return value._value
5834         if not isinstance(value, Obj):
5835             raise InvalidValueType((self.__class__, Obj, binary_type))
5836         return value
5837
5838     @property
5839     def ready(self):
5840         return self._value is not None
5841
5842     @property
5843     def tag_order(self):
5844         self._assert_ready()
5845         return self._tag_order
5846
5847     @property
5848     def bered(self):
5849         if self.expl_lenindef or self.lenindef:
5850             return True
5851         if self.defined is None:
5852             return False
5853         return self.defined[1].bered
5854
5855     def __getstate__(self):
5856         return AnyState(
5857             __version__,
5858             self.tag,
5859             self._tag_order,
5860             self._expl,
5861             None,
5862             self.optional,
5863             self.offset,
5864             self.llen,
5865             self.vlen,
5866             self.expl_lenindef,
5867             self.lenindef,
5868             self.ber_encoded,
5869             self._value,
5870             self.defined,
5871         )
5872
5873     def __setstate__(self, state):
5874         super(Any, self).__setstate__(state)
5875         self._value = state.value
5876         self.defined = state.defined
5877
5878     def __eq__(self, their):
5879         if their.__class__ == binary_type:
5880             if self._value.__class__ == binary_type:
5881                 return self._value == their
5882             return self._value.encode() == their
5883         if issubclass(their.__class__, Any):
5884             if self.ready and their.ready:
5885                 return bytes(self) == bytes(their)
5886             return self.ready == their.ready
5887         return False
5888
5889     def __call__(
5890             self,
5891             value=None,
5892             expl=None,
5893             optional=None,
5894     ):
5895         return self.__class__(
5896             value=value,
5897             expl=self._expl if expl is None else expl,
5898             optional=self.optional if optional is None else optional,
5899         )
5900
5901     def __bytes__(self):
5902         self._assert_ready()
5903         value = self._value
5904         if value.__class__ == binary_type:
5905             return value
5906         return self._value.encode()
5907
5908     @property
5909     def tlen(self):
5910         return 0
5911
5912     def _encode(self):
5913         self._assert_ready()
5914         value = self._value
5915         if value.__class__ == binary_type:
5916             return value
5917         return value.encode()
5918
5919     def _encode1st(self, state):
5920         self._assert_ready()
5921         value = self._value
5922         if value.__class__ == binary_type:
5923             return len(value), state
5924         return value.encode1st(state)
5925
5926     def _encode2nd(self, writer, state_iter):
5927         value = self._value
5928         if value.__class__ == binary_type:
5929             write_full(writer, value)
5930         else:
5931             value.encode2nd(writer, state_iter)
5932
5933     def _encode_cer(self, writer):
5934         self._assert_ready()
5935         value = self._value
5936         if value.__class__ == binary_type:
5937             write_full(writer, value)
5938         else:
5939             value.encode_cer(writer)
5940
5941     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5942         try:
5943             t, tlen, lv = tag_strip(tlv)
5944         except DecodeError as err:
5945             raise err.__class__(
5946                 msg=err.msg,
5947                 klass=self.__class__,
5948                 decode_path=decode_path,
5949                 offset=offset,
5950             )
5951         try:
5952             l, llen, v = len_decode(lv)
5953         except LenIndefForm as err:
5954             if not ctx.get("bered", False):
5955                 raise err.__class__(
5956                     msg=err.msg,
5957                     klass=self.__class__,
5958                     decode_path=decode_path,
5959                     offset=offset,
5960                 )
5961             llen, vlen, v = 1, 0, lv[1:]
5962             sub_offset = offset + tlen + llen
5963             chunk_i = 0
5964             while v[:EOC_LEN].tobytes() != EOC:
5965                 chunk, v = Any().decode(
5966                     v,
5967                     offset=sub_offset,
5968                     decode_path=decode_path + (str(chunk_i),),
5969                     leavemm=True,
5970                     ctx=ctx,
5971                     _ctx_immutable=False,
5972                 )
5973                 vlen += chunk.tlvlen
5974                 sub_offset += chunk.tlvlen
5975                 chunk_i += 1
5976             tlvlen = tlen + llen + vlen + EOC_LEN
5977             obj = self.__class__(
5978                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5979                 expl=self._expl,
5980                 optional=self.optional,
5981                 _decoded=(offset, 0, tlvlen),
5982             )
5983             obj.lenindef = True
5984             obj.tag = t.tobytes()
5985             yield decode_path, obj, v[EOC_LEN:]
5986             return
5987         except DecodeError as err:
5988             raise err.__class__(
5989                 msg=err.msg,
5990                 klass=self.__class__,
5991                 decode_path=decode_path,
5992                 offset=offset,
5993             )
5994         if l > len(v):
5995             raise NotEnoughData(
5996                 "encoded length is longer than data",
5997                 klass=self.__class__,
5998                 decode_path=decode_path,
5999                 offset=offset,
6000             )
6001         tlvlen = tlen + llen + l
6002         v, tail = tlv[:tlvlen], v[l:]
6003         obj = self.__class__(
6004             value=None if evgen_mode else v.tobytes(),
6005             expl=self._expl,
6006             optional=self.optional,
6007             _decoded=(offset, 0, tlvlen),
6008         )
6009         obj.tag = t.tobytes()
6010         yield decode_path, obj, tail
6011
6012     def __repr__(self):
6013         return pp_console_row(next(self.pps()))
6014
6015     def pps(self, decode_path=()):
6016         value = self._value
6017         if value is None:
6018             pass
6019         elif value.__class__ == binary_type:
6020             value = None
6021         else:
6022             value = repr(value)
6023         yield _pp(
6024             obj=self,
6025             asn1_type_name=self.asn1_type_name,
6026             obj_name=self.__class__.__name__,
6027             decode_path=decode_path,
6028             value=value,
6029             blob=self._value if self._value.__class__ == binary_type else None,
6030             optional=self.optional,
6031             default=self == self.default,
6032             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6033             expl=None if self._expl is None else tag_decode(self._expl),
6034             offset=self.offset,
6035             tlen=self.tlen,
6036             llen=self.llen,
6037             vlen=self.vlen,
6038             expl_offset=self.expl_offset if self.expled else None,
6039             expl_tlen=self.expl_tlen if self.expled else None,
6040             expl_llen=self.expl_llen if self.expled else None,
6041             expl_vlen=self.expl_vlen if self.expled else None,
6042             expl_lenindef=self.expl_lenindef,
6043             lenindef=self.lenindef,
6044             bered=self.bered,
6045         )
6046         defined_by, defined = self.defined or (None, None)
6047         if defined_by is not None:
6048             yield defined.pps(
6049                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6050             )
6051         for pp in self.pps_lenindef(decode_path):
6052             yield pp
6053
6054
6055 ########################################################################
6056 # ASN.1 constructed types
6057 ########################################################################
6058
6059 def abs_decode_path(decode_path, rel_path):
6060     """Create an absolute decode path from current and relative ones
6061
6062     :param decode_path: current decode path, starting point. Tuple of strings
6063     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6064                      If first tuple's element is "/", then treat it as
6065                      an absolute path, ignoring ``decode_path`` as
6066                      starting point. Also this tuple can contain ".."
6067                      elements, stripping the leading element from
6068                      ``decode_path``
6069
6070     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6071     ("foo", "bar", "baz", "whatever")
6072     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6073     ("foo", "whatever")
6074     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6075     ("baz", "whatever")
6076     """
6077     if rel_path[0] == "/":
6078         return rel_path[1:]
6079     if rel_path[0] == "..":
6080         return abs_decode_path(decode_path[:-1], rel_path[1:])
6081     return decode_path + rel_path
6082
6083
6084 SequenceState = namedtuple(
6085     "SequenceState",
6086     BasicState._fields + ("specs", "value",),
6087     **NAMEDTUPLE_KWARGS
6088 )
6089
6090
6091 class SequenceEncode1stMixing(object):
6092     def _encode1st(self, state):
6093         state.append(0)
6094         idx = len(state) - 1
6095         vlen = 0
6096         for v in self._values_for_encoding():
6097             l, _ = v.encode1st(state)
6098             vlen += l
6099         state[idx] = vlen
6100         return len(self.tag) + len_size(vlen) + vlen, state
6101
6102
6103 class Sequence(SequenceEncode1stMixing, Obj):
6104     """``SEQUENCE`` structure type
6105
6106     You have to make specification of sequence::
6107
6108         class Extension(Sequence):
6109             schema = (
6110                 ("extnID", ObjectIdentifier()),
6111                 ("critical", Boolean(default=False)),
6112                 ("extnValue", OctetString()),
6113             )
6114
6115     Then, you can work with it as with dictionary.
6116
6117     >>> ext = Extension()
6118     >>> Extension().specs
6119     OrderedDict([
6120         ('extnID', OBJECT IDENTIFIER),
6121         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6122         ('extnValue', OCTET STRING),
6123     ])
6124     >>> ext["extnID"] = "1.2.3"
6125     Traceback (most recent call last):
6126     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6127     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6128
6129     You can determine if sequence is ready to be encoded:
6130
6131     >>> ext.ready
6132     False
6133     >>> ext.encode()
6134     Traceback (most recent call last):
6135     pyderasn.ObjNotReady: object is not ready: extnValue
6136     >>> ext["extnValue"] = OctetString(b"foobar")
6137     >>> ext.ready
6138     True
6139
6140     Value you want to assign, must have the same **type** as in
6141     corresponding specification, but it can have different tags,
6142     optional/default attributes -- they will be taken from specification
6143     automatically::
6144
6145         class TBSCertificate(Sequence):
6146             schema = (
6147                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6148             [...]
6149
6150     >>> tbs = TBSCertificate()
6151     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6152
6153     Assign ``None`` to remove value from sequence.
6154
6155     You can set values in Sequence during its initialization:
6156
6157     >>> AlgorithmIdentifier((
6158         ("algorithm", ObjectIdentifier("1.2.3")),
6159         ("parameters", Any(Null()))
6160     ))
6161     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6162
6163     You can determine if value exists/set in the sequence and take its value:
6164
6165     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6166     (True, True, False)
6167     >>> ext["extnID"]
6168     OBJECT IDENTIFIER 1.2.3
6169
6170     But pay attention that if value has default, then it won't be (not
6171     in) in the sequence (because ``DEFAULT`` must not be encoded in
6172     DER), but you can read its value:
6173
6174     >>> "critical" in ext, ext["critical"]
6175     (False, BOOLEAN False)
6176     >>> ext["critical"] = Boolean(True)
6177     >>> "critical" in ext, ext["critical"]
6178     (True, BOOLEAN True)
6179
6180     All defaulted values are always optional.
6181
6182     .. _allow_default_values_ctx:
6183
6184     DER prohibits default value encoding and will raise an error if
6185     default value is unexpectedly met during decode.
6186     If :ref:`bered <bered_ctx>` context option is set, then no error
6187     will be raised, but ``bered`` attribute set. You can disable strict
6188     defaulted values existence validation by setting
6189     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6190
6191     .. warning::
6192
6193        Check for default value existence is not performed in
6194        ``evgen_mode``, because previously decoded values are not stored
6195        in memory, to be able to compare them.
6196
6197     Two sequences are equal if they have equal specification (schema),
6198     implicit/explicit tagging and the same values.
6199     """
6200     __slots__ = ("specs",)
6201     tag_default = tag_encode(form=TagFormConstructed, num=16)
6202     asn1_type_name = "SEQUENCE"
6203
6204     def __init__(
6205             self,
6206             value=None,
6207             schema=None,
6208             impl=None,
6209             expl=None,
6210             default=None,
6211             optional=False,
6212             _decoded=(0, 0, 0),
6213     ):
6214         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6215         if schema is None:
6216             schema = getattr(self, "schema", ())
6217         self.specs = (
6218             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6219         )
6220         self._value = {}
6221         if value is not None:
6222             if issubclass(value.__class__, Sequence):
6223                 self._value = value._value
6224             elif hasattr(value, "__iter__"):
6225                 for seq_key, seq_value in value:
6226                     self[seq_key] = seq_value
6227             else:
6228                 raise InvalidValueType((Sequence,))
6229         if default is not None:
6230             if not issubclass(default.__class__, Sequence):
6231                 raise InvalidValueType((Sequence,))
6232             default_value = default._value
6233             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6234             default_obj.specs = self.specs
6235             default_obj._value = default_value
6236             self.default = default_obj
6237             if value is None:
6238                 self._value = copy(default_obj._value)
6239
6240     @property
6241     def ready(self):
6242         for name, spec in iteritems(self.specs):
6243             value = self._value.get(name)
6244             if value is None:
6245                 if spec.optional:
6246                     continue
6247                 return False
6248             if not value.ready:
6249                 return False
6250         return True
6251
6252     @property
6253     def bered(self):
6254         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6255             return True
6256         return any(value.bered for value in itervalues(self._value))
6257
6258     def __getstate__(self):
6259         return SequenceState(
6260             __version__,
6261             self.tag,
6262             self._tag_order,
6263             self._expl,
6264             self.default,
6265             self.optional,
6266             self.offset,
6267             self.llen,
6268             self.vlen,
6269             self.expl_lenindef,
6270             self.lenindef,
6271             self.ber_encoded,
6272             self.specs,
6273             {k: copy(v) for k, v in iteritems(self._value)},
6274         )
6275
6276     def __setstate__(self, state):
6277         super(Sequence, self).__setstate__(state)
6278         self.specs = state.specs
6279         self._value = state.value
6280
6281     def __eq__(self, their):
6282         if not isinstance(their, self.__class__):
6283             return False
6284         return (
6285             self.specs == their.specs and
6286             self.tag == their.tag and
6287             self._expl == their._expl and
6288             self._value == their._value
6289         )
6290
6291     def __call__(
6292             self,
6293             value=None,
6294             impl=None,
6295             expl=None,
6296             default=None,
6297             optional=None,
6298     ):
6299         return self.__class__(
6300             value=value,
6301             schema=self.specs,
6302             impl=self.tag if impl is None else impl,
6303             expl=self._expl if expl is None else expl,
6304             default=self.default if default is None else default,
6305             optional=self.optional if optional is None else optional,
6306         )
6307
6308     def __contains__(self, key):
6309         return key in self._value
6310
6311     def __setitem__(self, key, value):
6312         spec = self.specs.get(key)
6313         if spec is None:
6314             raise ObjUnknown(key)
6315         if value is None:
6316             self._value.pop(key, None)
6317             return
6318         if not isinstance(value, spec.__class__):
6319             raise InvalidValueType((spec.__class__,))
6320         value = spec(value=value)
6321         if spec.default is not None and value == spec.default:
6322             self._value.pop(key, None)
6323             return
6324         self._value[key] = value
6325
6326     def __getitem__(self, key):
6327         value = self._value.get(key)
6328         if value is not None:
6329             return value
6330         spec = self.specs.get(key)
6331         if spec is None:
6332             raise ObjUnknown(key)
6333         if spec.default is not None:
6334             return spec.default
6335         return None
6336
6337     def _values_for_encoding(self):
6338         for name, spec in iteritems(self.specs):
6339             value = self._value.get(name)
6340             if value is None:
6341                 if spec.optional:
6342                     continue
6343                 raise ObjNotReady(name)
6344             yield value
6345
6346     def _encode(self):
6347         v = b"".join(v.encode() for v in self._values_for_encoding())
6348         return b"".join((self.tag, len_encode(len(v)), v))
6349
6350     def _encode2nd(self, writer, state_iter):
6351         write_full(writer, self.tag + len_encode(next(state_iter)))
6352         for v in self._values_for_encoding():
6353             v.encode2nd(writer, state_iter)
6354
6355     def _encode_cer(self, writer):
6356         write_full(writer, self.tag + LENINDEF)
6357         for v in self._values_for_encoding():
6358             v.encode_cer(writer)
6359         write_full(writer, EOC)
6360
6361     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6362         try:
6363             t, tlen, lv = tag_strip(tlv)
6364         except DecodeError as err:
6365             raise err.__class__(
6366                 msg=err.msg,
6367                 klass=self.__class__,
6368                 decode_path=decode_path,
6369                 offset=offset,
6370             )
6371         if t != self.tag:
6372             raise TagMismatch(
6373                 klass=self.__class__,
6374                 decode_path=decode_path,
6375                 offset=offset,
6376             )
6377         if tag_only:  # pragma: no cover
6378             yield None
6379             return
6380         lenindef = False
6381         ctx_bered = ctx.get("bered", False)
6382         try:
6383             l, llen, v = len_decode(lv)
6384         except LenIndefForm as err:
6385             if not ctx_bered:
6386                 raise err.__class__(
6387                     msg=err.msg,
6388                     klass=self.__class__,
6389                     decode_path=decode_path,
6390                     offset=offset,
6391                 )
6392             l, llen, v = 0, 1, lv[1:]
6393             lenindef = True
6394         except DecodeError as err:
6395             raise err.__class__(
6396                 msg=err.msg,
6397                 klass=self.__class__,
6398                 decode_path=decode_path,
6399                 offset=offset,
6400             )
6401         if l > len(v):
6402             raise NotEnoughData(
6403                 "encoded length is longer than data",
6404                 klass=self.__class__,
6405                 decode_path=decode_path,
6406                 offset=offset,
6407             )
6408         if not lenindef:
6409             v, tail = v[:l], v[l:]
6410         vlen = 0
6411         sub_offset = offset + tlen + llen
6412         values = {}
6413         ber_encoded = False
6414         ctx_allow_default_values = ctx.get("allow_default_values", False)
6415         for name, spec in iteritems(self.specs):
6416             if spec.optional and (
6417                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6418                     len(v) == 0
6419             ):
6420                 continue
6421             sub_decode_path = decode_path + (name,)
6422             try:
6423                 if evgen_mode:
6424                     for _decode_path, value, v_tail in spec.decode_evgen(
6425                             v,
6426                             sub_offset,
6427                             leavemm=True,
6428                             decode_path=sub_decode_path,
6429                             ctx=ctx,
6430                             _ctx_immutable=False,
6431                     ):
6432                         yield _decode_path, value, v_tail
6433                 else:
6434                     _, value, v_tail = next(spec.decode_evgen(
6435                         v,
6436                         sub_offset,
6437                         leavemm=True,
6438                         decode_path=sub_decode_path,
6439                         ctx=ctx,
6440                         _ctx_immutable=False,
6441                         _evgen_mode=False,
6442                     ))
6443             except TagMismatch as err:
6444                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6445                     continue
6446                 raise
6447
6448             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6449             if not evgen_mode and defined is not None:
6450                 defined_by, defined_spec = defined
6451                 if issubclass(value.__class__, SequenceOf):
6452                     for i, _value in enumerate(value):
6453                         sub_sub_decode_path = sub_decode_path + (
6454                             str(i),
6455                             DecodePathDefBy(defined_by),
6456                         )
6457                         defined_value, defined_tail = defined_spec.decode(
6458                             memoryview(bytes(_value)),
6459                             sub_offset + (
6460                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6461                                 if value.expled else (value.tlen + value.llen)
6462                             ),
6463                             leavemm=True,
6464                             decode_path=sub_sub_decode_path,
6465                             ctx=ctx,
6466                             _ctx_immutable=False,
6467                         )
6468                         if len(defined_tail) > 0:
6469                             raise DecodeError(
6470                                 "remaining data",
6471                                 klass=self.__class__,
6472                                 decode_path=sub_sub_decode_path,
6473                                 offset=offset,
6474                             )
6475                         _value.defined = (defined_by, defined_value)
6476                 else:
6477                     defined_value, defined_tail = defined_spec.decode(
6478                         memoryview(bytes(value)),
6479                         sub_offset + (
6480                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6481                             if value.expled else (value.tlen + value.llen)
6482                         ),
6483                         leavemm=True,
6484                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6485                         ctx=ctx,
6486                         _ctx_immutable=False,
6487                     )
6488                     if len(defined_tail) > 0:
6489                         raise DecodeError(
6490                             "remaining data",
6491                             klass=self.__class__,
6492                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6493                             offset=offset,
6494                         )
6495                     value.defined = (defined_by, defined_value)
6496
6497             value_len = value.fulllen
6498             vlen += value_len
6499             sub_offset += value_len
6500             v = v_tail
6501             if not evgen_mode:
6502                 if spec.default is not None and value == spec.default:
6503                     # This will not work in evgen_mode
6504                     if ctx_bered or ctx_allow_default_values:
6505                         ber_encoded = True
6506                     else:
6507                         raise DecodeError(
6508                             "DEFAULT value met",
6509                             klass=self.__class__,
6510                             decode_path=sub_decode_path,
6511                             offset=sub_offset,
6512                         )
6513                 values[name] = value
6514                 spec_defines = getattr(spec, "defines", ())
6515                 if len(spec_defines) == 0:
6516                     defines_by_path = ctx.get("defines_by_path", ())
6517                     if len(defines_by_path) > 0:
6518                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6519                 if spec_defines is not None and len(spec_defines) > 0:
6520                     for rel_path, schema in spec_defines:
6521                         defined = schema.get(value, None)
6522                         if defined is not None:
6523                             ctx.setdefault("_defines", []).append((
6524                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6525                                 (value, defined),
6526                             ))
6527         if lenindef:
6528             if v[:EOC_LEN].tobytes() != EOC:
6529                 raise DecodeError(
6530                     "no EOC",
6531                     klass=self.__class__,
6532                     decode_path=decode_path,
6533                     offset=offset,
6534                 )
6535             tail = v[EOC_LEN:]
6536             vlen += EOC_LEN
6537         elif len(v) > 0:
6538             raise DecodeError(
6539                 "remaining data",
6540                 klass=self.__class__,
6541                 decode_path=decode_path,
6542                 offset=offset,
6543             )
6544         obj = self.__class__(
6545             schema=self.specs,
6546             impl=self.tag,
6547             expl=self._expl,
6548             default=self.default,
6549             optional=self.optional,
6550             _decoded=(offset, llen, vlen),
6551         )
6552         obj._value = values
6553         obj.lenindef = lenindef
6554         obj.ber_encoded = ber_encoded
6555         yield decode_path, obj, tail
6556
6557     def __repr__(self):
6558         value = pp_console_row(next(self.pps()))
6559         cols = []
6560         for name in self.specs:
6561             _value = self._value.get(name)
6562             if _value is None:
6563                 continue
6564             cols.append("%s: %s" % (name, repr(_value)))
6565         return "%s[%s]" % (value, "; ".join(cols))
6566
6567     def pps(self, decode_path=()):
6568         yield _pp(
6569             obj=self,
6570             asn1_type_name=self.asn1_type_name,
6571             obj_name=self.__class__.__name__,
6572             decode_path=decode_path,
6573             optional=self.optional,
6574             default=self == self.default,
6575             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6576             expl=None if self._expl is None else tag_decode(self._expl),
6577             offset=self.offset,
6578             tlen=self.tlen,
6579             llen=self.llen,
6580             vlen=self.vlen,
6581             expl_offset=self.expl_offset if self.expled else None,
6582             expl_tlen=self.expl_tlen if self.expled else None,
6583             expl_llen=self.expl_llen if self.expled else None,
6584             expl_vlen=self.expl_vlen if self.expled else None,
6585             expl_lenindef=self.expl_lenindef,
6586             lenindef=self.lenindef,
6587             ber_encoded=self.ber_encoded,
6588             bered=self.bered,
6589         )
6590         for name in self.specs:
6591             value = self._value.get(name)
6592             if value is None:
6593                 continue
6594             yield value.pps(decode_path=decode_path + (name,))
6595         for pp in self.pps_lenindef(decode_path):
6596             yield pp
6597
6598
6599 class Set(Sequence, SequenceEncode1stMixing):
6600     """``SET`` structure type
6601
6602     Its usage is identical to :py:class:`pyderasn.Sequence`.
6603
6604     .. _allow_unordered_set_ctx:
6605
6606     DER prohibits unordered values encoding and will raise an error
6607     during decode. If :ref:`bered <bered_ctx>` context option is set,
6608     then no error will occur. Also you can disable strict values
6609     ordering check by setting ``"allow_unordered_set": True``
6610     :ref:`context <ctx>` option.
6611     """
6612     __slots__ = ()
6613     tag_default = tag_encode(form=TagFormConstructed, num=17)
6614     asn1_type_name = "SET"
6615
6616     def _values_for_encoding(self):
6617         return sorted(
6618             super(Set, self)._values_for_encoding(),
6619             key=attrgetter("tag_order"),
6620         )
6621
6622     def _encode_cer(self, writer):
6623         write_full(writer, self.tag + LENINDEF)
6624         for v in sorted(
6625                 super(Set, self)._values_for_encoding(),
6626                 key=attrgetter("tag_order_cer"),
6627         ):
6628             v.encode_cer(writer)
6629         write_full(writer, EOC)
6630
6631     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6632         try:
6633             t, tlen, lv = tag_strip(tlv)
6634         except DecodeError as err:
6635             raise err.__class__(
6636                 msg=err.msg,
6637                 klass=self.__class__,
6638                 decode_path=decode_path,
6639                 offset=offset,
6640             )
6641         if t != self.tag:
6642             raise TagMismatch(
6643                 klass=self.__class__,
6644                 decode_path=decode_path,
6645                 offset=offset,
6646             )
6647         if tag_only:
6648             yield None
6649             return
6650         lenindef = False
6651         ctx_bered = ctx.get("bered", False)
6652         try:
6653             l, llen, v = len_decode(lv)
6654         except LenIndefForm as err:
6655             if not ctx_bered:
6656                 raise err.__class__(
6657                     msg=err.msg,
6658                     klass=self.__class__,
6659                     decode_path=decode_path,
6660                     offset=offset,
6661                 )
6662             l, llen, v = 0, 1, lv[1:]
6663             lenindef = True
6664         except DecodeError as err:
6665             raise err.__class__(
6666                 msg=err.msg,
6667                 klass=self.__class__,
6668                 decode_path=decode_path,
6669                 offset=offset,
6670             )
6671         if l > len(v):
6672             raise NotEnoughData(
6673                 "encoded length is longer than data",
6674                 klass=self.__class__,
6675                 offset=offset,
6676             )
6677         if not lenindef:
6678             v, tail = v[:l], v[l:]
6679         vlen = 0
6680         sub_offset = offset + tlen + llen
6681         values = {}
6682         ber_encoded = False
6683         ctx_allow_default_values = ctx.get("allow_default_values", False)
6684         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6685         tag_order_prev = (0, 0)
6686         _specs_items = copy(self.specs)
6687
6688         while len(v) > 0:
6689             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6690                 break
6691             for name, spec in iteritems(_specs_items):
6692                 sub_decode_path = decode_path + (name,)
6693                 try:
6694                     spec.decode(
6695                         v,
6696                         sub_offset,
6697                         leavemm=True,
6698                         decode_path=sub_decode_path,
6699                         ctx=ctx,
6700                         tag_only=True,
6701                         _ctx_immutable=False,
6702                     )
6703                 except TagMismatch:
6704                     continue
6705                 break
6706             else:
6707                 raise TagMismatch(
6708                     klass=self.__class__,
6709                     decode_path=decode_path,
6710                     offset=offset,
6711                 )
6712             if evgen_mode:
6713                 for _decode_path, value, v_tail in spec.decode_evgen(
6714                         v,
6715                         sub_offset,
6716                         leavemm=True,
6717                         decode_path=sub_decode_path,
6718                         ctx=ctx,
6719                         _ctx_immutable=False,
6720                 ):
6721                     yield _decode_path, value, v_tail
6722             else:
6723                 _, value, v_tail = next(spec.decode_evgen(
6724                     v,
6725                     sub_offset,
6726                     leavemm=True,
6727                     decode_path=sub_decode_path,
6728                     ctx=ctx,
6729                     _ctx_immutable=False,
6730                     _evgen_mode=False,
6731                 ))
6732             value_tag_order = value.tag_order
6733             value_len = value.fulllen
6734             if tag_order_prev >= value_tag_order:
6735                 if ctx_bered or ctx_allow_unordered_set:
6736                     ber_encoded = True
6737                 else:
6738                     raise DecodeError(
6739                         "unordered " + self.asn1_type_name,
6740                         klass=self.__class__,
6741                         decode_path=sub_decode_path,
6742                         offset=sub_offset,
6743                     )
6744             if spec.default is None or value != spec.default:
6745                 pass
6746             elif ctx_bered or ctx_allow_default_values:
6747                 ber_encoded = True
6748             else:
6749                 raise DecodeError(
6750                     "DEFAULT value met",
6751                     klass=self.__class__,
6752                     decode_path=sub_decode_path,
6753                     offset=sub_offset,
6754                 )
6755             values[name] = value
6756             del _specs_items[name]
6757             tag_order_prev = value_tag_order
6758             sub_offset += value_len
6759             vlen += value_len
6760             v = v_tail
6761
6762         obj = self.__class__(
6763             schema=self.specs,
6764             impl=self.tag,
6765             expl=self._expl,
6766             default=self.default,
6767             optional=self.optional,
6768             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6769         )
6770         if lenindef:
6771             if v[:EOC_LEN].tobytes() != EOC:
6772                 raise DecodeError(
6773                     "no EOC",
6774                     klass=self.__class__,
6775                     decode_path=decode_path,
6776                     offset=offset,
6777                 )
6778             tail = v[EOC_LEN:]
6779             obj.lenindef = True
6780         for name, spec in iteritems(self.specs):
6781             if name not in values and not spec.optional:
6782                 raise DecodeError(
6783                     "%s value is not ready" % name,
6784                     klass=self.__class__,
6785                     decode_path=decode_path,
6786                     offset=offset,
6787                 )
6788         if not evgen_mode:
6789             obj._value = values
6790         obj.ber_encoded = ber_encoded
6791         yield decode_path, obj, tail
6792
6793
6794 SequenceOfState = namedtuple(
6795     "SequenceOfState",
6796     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6797     **NAMEDTUPLE_KWARGS
6798 )
6799
6800
6801 class SequenceOf(SequenceEncode1stMixing, Obj):
6802     """``SEQUENCE OF`` sequence type
6803
6804     For that kind of type you must specify the object it will carry on
6805     (bounds are for example here, not required)::
6806
6807         class Ints(SequenceOf):
6808             schema = Integer()
6809             bounds = (0, 2)
6810
6811     >>> ints = Ints()
6812     >>> ints.append(Integer(123))
6813     >>> ints.append(Integer(234))
6814     >>> ints
6815     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6816     >>> [int(i) for i in ints]
6817     [123, 234]
6818     >>> ints.append(Integer(345))
6819     Traceback (most recent call last):
6820     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6821     >>> ints[1]
6822     INTEGER 234
6823     >>> ints[1] = Integer(345)
6824     >>> ints
6825     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6826
6827     You can initialize sequence with preinitialized values:
6828
6829     >>> ints = Ints([Integer(123), Integer(234)])
6830
6831     Also you can use iterator as a value:
6832
6833     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6834
6835     And it won't be iterated until encoding process. Pay attention that
6836     bounds and required schema checks are done only during the encoding
6837     process in that case! After encode was called, then value is zeroed
6838     back to empty list and you have to set it again. That mode is useful
6839     mainly with CER encoding mode, where all objects from the iterable
6840     will be streamed to the buffer, without copying all of them to
6841     memory first.
6842     """
6843     __slots__ = ("spec", "_bound_min", "_bound_max")
6844     tag_default = tag_encode(form=TagFormConstructed, num=16)
6845     asn1_type_name = "SEQUENCE OF"
6846
6847     def __init__(
6848             self,
6849             value=None,
6850             schema=None,
6851             bounds=None,
6852             impl=None,
6853             expl=None,
6854             default=None,
6855             optional=False,
6856             _decoded=(0, 0, 0),
6857     ):
6858         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6859         if schema is None:
6860             schema = getattr(self, "schema", None)
6861         if schema is None:
6862             raise ValueError("schema must be specified")
6863         self.spec = schema
6864         self._bound_min, self._bound_max = getattr(
6865             self,
6866             "bounds",
6867             (0, float("+inf")),
6868         ) if bounds is None else bounds
6869         self._value = []
6870         if value is not None:
6871             self._value = self._value_sanitize(value)
6872         if default is not None:
6873             default_value = self._value_sanitize(default)
6874             default_obj = self.__class__(
6875                 schema=schema,
6876                 impl=self.tag,
6877                 expl=self._expl,
6878             )
6879             default_obj._value = default_value
6880             self.default = default_obj
6881             if value is None:
6882                 self._value = copy(default_obj._value)
6883
6884     def _value_sanitize(self, value):
6885         iterator = False
6886         if issubclass(value.__class__, SequenceOf):
6887             value = value._value
6888         elif hasattr(value, NEXT_ATTR_NAME):
6889             iterator = True
6890         elif hasattr(value, "__iter__"):
6891             value = list(value)
6892         else:
6893             raise InvalidValueType((self.__class__, iter, "iterator"))
6894         if not iterator:
6895             if not self._bound_min <= len(value) <= self._bound_max:
6896                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6897             class_expected = self.spec.__class__
6898             for v in value:
6899                 if not isinstance(v, class_expected):
6900                     raise InvalidValueType((class_expected,))
6901         return value
6902
6903     @property
6904     def ready(self):
6905         if hasattr(self._value, NEXT_ATTR_NAME):
6906             return True
6907         if self._bound_min > 0 and len(self._value) == 0:
6908             return False
6909         return all(v.ready for v in self._value)
6910
6911     @property
6912     def bered(self):
6913         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6914             return True
6915         return any(v.bered for v in self._value)
6916
6917     def __getstate__(self):
6918         if hasattr(self._value, NEXT_ATTR_NAME):
6919             raise ValueError("can not pickle SequenceOf with iterator")
6920         return SequenceOfState(
6921             __version__,
6922             self.tag,
6923             self._tag_order,
6924             self._expl,
6925             self.default,
6926             self.optional,
6927             self.offset,
6928             self.llen,
6929             self.vlen,
6930             self.expl_lenindef,
6931             self.lenindef,
6932             self.ber_encoded,
6933             self.spec,
6934             [copy(v) for v in self._value],
6935             self._bound_min,
6936             self._bound_max,
6937         )
6938
6939     def __setstate__(self, state):
6940         super(SequenceOf, self).__setstate__(state)
6941         self.spec = state.spec
6942         self._value = state.value
6943         self._bound_min = state.bound_min
6944         self._bound_max = state.bound_max
6945
6946     def __eq__(self, their):
6947         if isinstance(their, self.__class__):
6948             return (
6949                 self.spec == their.spec and
6950                 self.tag == their.tag and
6951                 self._expl == their._expl and
6952                 self._value == their._value
6953             )
6954         if hasattr(their, "__iter__"):
6955             return self._value == list(their)
6956         return False
6957
6958     def __call__(
6959             self,
6960             value=None,
6961             bounds=None,
6962             impl=None,
6963             expl=None,
6964             default=None,
6965             optional=None,
6966     ):
6967         return self.__class__(
6968             value=value,
6969             schema=self.spec,
6970             bounds=(
6971                 (self._bound_min, self._bound_max)
6972                 if bounds is None else bounds
6973             ),
6974             impl=self.tag if impl is None else impl,
6975             expl=self._expl if expl is None else expl,
6976             default=self.default if default is None else default,
6977             optional=self.optional if optional is None else optional,
6978         )
6979
6980     def __contains__(self, key):
6981         return key in self._value
6982
6983     def append(self, value):
6984         if not isinstance(value, self.spec.__class__):
6985             raise InvalidValueType((self.spec.__class__,))
6986         if len(self._value) + 1 > self._bound_max:
6987             raise BoundsError(
6988                 self._bound_min,
6989                 len(self._value) + 1,
6990                 self._bound_max,
6991             )
6992         self._value.append(value)
6993
6994     def __iter__(self):
6995         return iter(self._value)
6996
6997     def __len__(self):
6998         return len(self._value)
6999
7000     def __setitem__(self, key, value):
7001         if not isinstance(value, self.spec.__class__):
7002             raise InvalidValueType((self.spec.__class__,))
7003         self._value[key] = self.spec(value=value)
7004
7005     def __getitem__(self, key):
7006         return self._value[key]
7007
7008     def _values_for_encoding(self):
7009         return iter(self._value)
7010
7011     def _encode(self):
7012         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7013         if iterator:
7014             values = []
7015             values_append = values.append
7016             class_expected = self.spec.__class__
7017             values_for_encoding = self._values_for_encoding()
7018             self._value = []
7019             for v in values_for_encoding:
7020                 if not isinstance(v, class_expected):
7021                     raise InvalidValueType((class_expected,))
7022                 values_append(v.encode())
7023             if not self._bound_min <= len(values) <= self._bound_max:
7024                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7025             value = b"".join(values)
7026         else:
7027             value = b"".join(v.encode() for v in self._values_for_encoding())
7028         return b"".join((self.tag, len_encode(len(value)), value))
7029
7030     def _encode1st(self, state):
7031         state = super(SequenceOf, self)._encode1st(state)
7032         if hasattr(self._value, NEXT_ATTR_NAME):
7033             self._value = []
7034         return state
7035
7036     def _encode2nd(self, writer, state_iter):
7037         write_full(writer, self.tag + len_encode(next(state_iter)))
7038         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7039         if iterator:
7040             values_count = 0
7041             class_expected = self.spec.__class__
7042             values_for_encoding = self._values_for_encoding()
7043             self._value = []
7044             for v in values_for_encoding:
7045                 if not isinstance(v, class_expected):
7046                     raise InvalidValueType((class_expected,))
7047                 v.encode2nd(writer, state_iter)
7048                 values_count += 1
7049             if not self._bound_min <= values_count <= self._bound_max:
7050                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7051         else:
7052             for v in self._values_for_encoding():
7053                 v.encode2nd(writer, state_iter)
7054
7055     def _encode_cer(self, writer):
7056         write_full(writer, self.tag + LENINDEF)
7057         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7058         if iterator:
7059             class_expected = self.spec.__class__
7060             values_count = 0
7061             values_for_encoding = self._values_for_encoding()
7062             self._value = []
7063             for v in values_for_encoding:
7064                 if not isinstance(v, class_expected):
7065                     raise InvalidValueType((class_expected,))
7066                 v.encode_cer(writer)
7067                 values_count += 1
7068             if not self._bound_min <= values_count <= self._bound_max:
7069                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7070         else:
7071             for v in self._values_for_encoding():
7072                 v.encode_cer(writer)
7073         write_full(writer, EOC)
7074
7075     def _decode(
7076             self,
7077             tlv,
7078             offset,
7079             decode_path,
7080             ctx,
7081             tag_only,
7082             evgen_mode,
7083             ordering_check=False,
7084     ):
7085         try:
7086             t, tlen, lv = tag_strip(tlv)
7087         except DecodeError as err:
7088             raise err.__class__(
7089                 msg=err.msg,
7090                 klass=self.__class__,
7091                 decode_path=decode_path,
7092                 offset=offset,
7093             )
7094         if t != self.tag:
7095             raise TagMismatch(
7096                 klass=self.__class__,
7097                 decode_path=decode_path,
7098                 offset=offset,
7099             )
7100         if tag_only:
7101             yield None
7102             return
7103         lenindef = False
7104         ctx_bered = ctx.get("bered", False)
7105         try:
7106             l, llen, v = len_decode(lv)
7107         except LenIndefForm as err:
7108             if not ctx_bered:
7109                 raise err.__class__(
7110                     msg=err.msg,
7111                     klass=self.__class__,
7112                     decode_path=decode_path,
7113                     offset=offset,
7114                 )
7115             l, llen, v = 0, 1, lv[1:]
7116             lenindef = True
7117         except DecodeError as err:
7118             raise err.__class__(
7119                 msg=err.msg,
7120                 klass=self.__class__,
7121                 decode_path=decode_path,
7122                 offset=offset,
7123             )
7124         if l > len(v):
7125             raise NotEnoughData(
7126                 "encoded length is longer than data",
7127                 klass=self.__class__,
7128                 decode_path=decode_path,
7129                 offset=offset,
7130             )
7131         if not lenindef:
7132             v, tail = v[:l], v[l:]
7133         vlen = 0
7134         sub_offset = offset + tlen + llen
7135         _value = []
7136         _value_count = 0
7137         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7138         value_prev = memoryview(v[:0])
7139         ber_encoded = False
7140         spec = self.spec
7141         while len(v) > 0:
7142             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7143                 break
7144             sub_decode_path = decode_path + (str(_value_count),)
7145             if evgen_mode:
7146                 for _decode_path, value, v_tail in spec.decode_evgen(
7147                         v,
7148                         sub_offset,
7149                         leavemm=True,
7150                         decode_path=sub_decode_path,
7151                         ctx=ctx,
7152                         _ctx_immutable=False,
7153                 ):
7154                     yield _decode_path, value, v_tail
7155             else:
7156                 _, value, v_tail = next(spec.decode_evgen(
7157                     v,
7158                     sub_offset,
7159                     leavemm=True,
7160                     decode_path=sub_decode_path,
7161                     ctx=ctx,
7162                     _ctx_immutable=False,
7163                     _evgen_mode=False,
7164                 ))
7165             value_len = value.fulllen
7166             if ordering_check:
7167                 if value_prev.tobytes() > v[:value_len].tobytes():
7168                     if ctx_bered or ctx_allow_unordered_set:
7169                         ber_encoded = True
7170                     else:
7171                         raise DecodeError(
7172                             "unordered " + self.asn1_type_name,
7173                             klass=self.__class__,
7174                             decode_path=sub_decode_path,
7175                             offset=sub_offset,
7176                         )
7177                 value_prev = v[:value_len]
7178             _value_count += 1
7179             if not evgen_mode:
7180                 _value.append(value)
7181             sub_offset += value_len
7182             vlen += value_len
7183             v = v_tail
7184         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7185             raise DecodeError(
7186                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7187                 klass=self.__class__,
7188                 decode_path=decode_path,
7189                 offset=offset,
7190             )
7191         try:
7192             obj = self.__class__(
7193                 value=None if evgen_mode else _value,
7194                 schema=spec,
7195                 bounds=(self._bound_min, self._bound_max),
7196                 impl=self.tag,
7197                 expl=self._expl,
7198                 default=self.default,
7199                 optional=self.optional,
7200                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7201             )
7202         except BoundsError as err:
7203             raise DecodeError(
7204                 msg=str(err),
7205                 klass=self.__class__,
7206                 decode_path=decode_path,
7207                 offset=offset,
7208             )
7209         if lenindef:
7210             if v[:EOC_LEN].tobytes() != EOC:
7211                 raise DecodeError(
7212                     "no EOC",
7213                     klass=self.__class__,
7214                     decode_path=decode_path,
7215                     offset=offset,
7216                 )
7217             obj.lenindef = True
7218             tail = v[EOC_LEN:]
7219         obj.ber_encoded = ber_encoded
7220         yield decode_path, obj, tail
7221
7222     def __repr__(self):
7223         return "%s[%s]" % (
7224             pp_console_row(next(self.pps())),
7225             ", ".join(repr(v) for v in self._value),
7226         )
7227
7228     def pps(self, decode_path=()):
7229         yield _pp(
7230             obj=self,
7231             asn1_type_name=self.asn1_type_name,
7232             obj_name=self.__class__.__name__,
7233             decode_path=decode_path,
7234             optional=self.optional,
7235             default=self == self.default,
7236             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7237             expl=None if self._expl is None else tag_decode(self._expl),
7238             offset=self.offset,
7239             tlen=self.tlen,
7240             llen=self.llen,
7241             vlen=self.vlen,
7242             expl_offset=self.expl_offset if self.expled else None,
7243             expl_tlen=self.expl_tlen if self.expled else None,
7244             expl_llen=self.expl_llen if self.expled else None,
7245             expl_vlen=self.expl_vlen if self.expled else None,
7246             expl_lenindef=self.expl_lenindef,
7247             lenindef=self.lenindef,
7248             ber_encoded=self.ber_encoded,
7249             bered=self.bered,
7250         )
7251         for i, value in enumerate(self._value):
7252             yield value.pps(decode_path=decode_path + (str(i),))
7253         for pp in self.pps_lenindef(decode_path):
7254             yield pp
7255
7256
7257 class SetOf(SequenceOf):
7258     """``SET OF`` sequence type
7259
7260     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7261     """
7262     __slots__ = ()
7263     tag_default = tag_encode(form=TagFormConstructed, num=17)
7264     asn1_type_name = "SET OF"
7265
7266     def _value_sanitize(self, value):
7267         value = super(SetOf, self)._value_sanitize(value)
7268         if hasattr(value, NEXT_ATTR_NAME):
7269             raise ValueError(
7270                 "SetOf does not support iterator values, as no sense in them"
7271             )
7272         return value
7273
7274     def _encode(self):
7275         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7276         return b"".join((self.tag, len_encode(len(v)), v))
7277
7278     def _encode2nd(self, writer, state_iter):
7279         write_full(writer, self.tag + len_encode(next(state_iter)))
7280         values = []
7281         for v in self._values_for_encoding():
7282             buf = BytesIO()
7283             v.encode2nd(buf.write, state_iter)
7284             values.append(buf.getvalue())
7285         values.sort()
7286         for v in values:
7287             write_full(writer, v)
7288
7289     def _encode_cer(self, writer):
7290         write_full(writer, self.tag + LENINDEF)
7291         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7292             write_full(writer, v)
7293         write_full(writer, EOC)
7294
7295     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7296         return super(SetOf, self)._decode(
7297             tlv,
7298             offset,
7299             decode_path,
7300             ctx,
7301             tag_only,
7302             evgen_mode,
7303             ordering_check=True,
7304         )
7305
7306
7307 def obj_by_path(pypath):  # pragma: no cover
7308     """Import object specified as string Python path
7309
7310     Modules must be separated from classes/functions with ``:``.
7311
7312     >>> obj_by_path("foo.bar:Baz")
7313     <class 'foo.bar.Baz'>
7314     >>> obj_by_path("foo.bar:Baz.boo")
7315     <classmethod 'foo.bar.Baz.boo'>
7316     """
7317     mod, objs = pypath.rsplit(":", 1)
7318     from importlib import import_module
7319     obj = import_module(mod)
7320     for obj_name in objs.split("."):
7321         obj = getattr(obj, obj_name)
7322     return obj
7323
7324
7325 def generic_decoder():  # pragma: no cover
7326     # All of this below is a big hack with self references
7327     choice = PrimitiveTypes()
7328     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7329     choice.specs["SetOf"] = SetOf(schema=choice)
7330     for i in six_xrange(31):
7331         choice.specs["SequenceOf%d" % i] = SequenceOf(
7332             schema=choice,
7333             expl=tag_ctxc(i),
7334         )
7335     choice.specs["Any"] = Any()
7336
7337     # Class name equals to type name, to omit it from output
7338     class SEQUENCEOF(SequenceOf):
7339         __slots__ = ()
7340         schema = choice
7341
7342     def pprint_any(
7343             obj,
7344             oid_maps=(),
7345             with_colours=False,
7346             with_decode_path=False,
7347             decode_path_only=(),
7348             decode_path=(),
7349     ):
7350         def _pprint_pps(pps):
7351             for pp in pps:
7352                 if hasattr(pp, "_fields"):
7353                     if (
7354                             decode_path_only != () and
7355                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7356                     ):
7357                         continue
7358                     if pp.asn1_type_name == Choice.asn1_type_name:
7359                         continue
7360                     pp_kwargs = pp._asdict()
7361                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7362                     pp = _pp(**pp_kwargs)
7363                     yield pp_console_row(
7364                         pp,
7365                         oid_maps=oid_maps,
7366                         with_offsets=True,
7367                         with_blob=False,
7368                         with_colours=with_colours,
7369                         with_decode_path=with_decode_path,
7370                         decode_path_len_decrease=len(decode_path_only),
7371                     )
7372                     for row in pp_console_blob(
7373                             pp,
7374                             decode_path_len_decrease=len(decode_path_only),
7375                     ):
7376                         yield row
7377                 else:
7378                     for row in _pprint_pps(pp):
7379                         yield row
7380         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7381     return SEQUENCEOF(), pprint_any
7382
7383
7384 def main():  # pragma: no cover
7385     import argparse
7386     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7387     parser.add_argument(
7388         "--skip",
7389         type=int,
7390         default=0,
7391         help="Skip that number of bytes from the beginning",
7392     )
7393     parser.add_argument(
7394         "--oids",
7395         help="Python paths to dictionary with OIDs, comma separated",
7396     )
7397     parser.add_argument(
7398         "--schema",
7399         help="Python path to schema definition to use",
7400     )
7401     parser.add_argument(
7402         "--defines-by-path",
7403         help="Python path to decoder's defines_by_path",
7404     )
7405     parser.add_argument(
7406         "--nobered",
7407         action="store_true",
7408         help="Disallow BER encoding",
7409     )
7410     parser.add_argument(
7411         "--print-decode-path",
7412         action="store_true",
7413         help="Print decode paths",
7414     )
7415     parser.add_argument(
7416         "--decode-path-only",
7417         help="Print only specified decode path",
7418     )
7419     parser.add_argument(
7420         "--allow-expl-oob",
7421         action="store_true",
7422         help="Allow explicit tag out-of-bound",
7423     )
7424     parser.add_argument(
7425         "--evgen",
7426         action="store_true",
7427         help="Turn on event generation mode",
7428     )
7429     parser.add_argument(
7430         "RAWFile",
7431         type=argparse.FileType("rb"),
7432         help="Path to BER/CER/DER file you want to decode",
7433     )
7434     args = parser.parse_args()
7435     if PY2:
7436         args.RAWFile.seek(args.skip)
7437         raw = memoryview(args.RAWFile.read())
7438         args.RAWFile.close()
7439     else:
7440         raw = file_mmaped(args.RAWFile)[args.skip:]
7441     oid_maps = (
7442         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7443         if args.oids else ()
7444     )
7445     from functools import partial
7446     if args.schema:
7447         schema = obj_by_path(args.schema)
7448         pprinter = partial(pprint, big_blobs=True)
7449     else:
7450         schema, pprinter = generic_decoder()
7451     ctx = {
7452         "bered": not args.nobered,
7453         "allow_expl_oob": args.allow_expl_oob,
7454     }
7455     if args.defines_by_path is not None:
7456         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7457     from os import environ
7458     pprinter = partial(
7459         pprinter,
7460         oid_maps=oid_maps,
7461         with_colours=environ.get("NO_COLOR") is None,
7462         with_decode_path=args.print_decode_path,
7463         decode_path_only=(
7464             () if args.decode_path_only is None else
7465             tuple(args.decode_path_only.split(":"))
7466         ),
7467     )
7468     if args.evgen:
7469         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7470             print(pprinter(obj, decode_path=decode_path))
7471     else:
7472         obj, tail = schema().decode(raw, ctx=ctx)
7473         print(pprinter(obj))
7474     if tail != b"":
7475         print("\nTrailing data: %s" % hexenc(tail))
7476
7477
7478 if __name__ == "__main__":
7479     main()