]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
colonize_hex is publicly available function
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 Base Obj
905 --------
906 .. autoclass:: pyderasn.Obj
907    :members:
908
909 Primitive types
910 ---------------
911
912 Boolean
913 _______
914 .. autoclass:: pyderasn.Boolean
915    :members: __init__
916
917 Integer
918 _______
919 .. autoclass:: pyderasn.Integer
920    :members: __init__, named
921
922 BitString
923 _________
924 .. autoclass:: pyderasn.BitString
925    :members: __init__, bit_len, named
926
927 OctetString
928 ___________
929 .. autoclass:: pyderasn.OctetString
930    :members: __init__
931
932 Null
933 ____
934 .. autoclass:: pyderasn.Null
935    :members: __init__
936
937 ObjectIdentifier
938 ________________
939 .. autoclass:: pyderasn.ObjectIdentifier
940    :members: __init__
941
942 Enumerated
943 __________
944 .. autoclass:: pyderasn.Enumerated
945
946 CommonString
947 ____________
948 .. autoclass:: pyderasn.CommonString
949
950 NumericString
951 _____________
952 .. autoclass:: pyderasn.NumericString
953
954 PrintableString
955 _______________
956 .. autoclass:: pyderasn.PrintableString
957    :members: __init__, allow_asterisk, allow_ampersand
958
959 UTCTime
960 _______
961 .. autoclass:: pyderasn.UTCTime
962    :members: __init__, todatetime
963
964 GeneralizedTime
965 _______________
966 .. autoclass:: pyderasn.GeneralizedTime
967    :members: __init__, todatetime
968
969 Special types
970 -------------
971
972 Choice
973 ______
974 .. autoclass:: pyderasn.Choice
975    :members: __init__, choice, value
976
977 PrimitiveTypes
978 ______________
979 .. autoclass:: PrimitiveTypes
980
981 Any
982 ___
983 .. autoclass:: pyderasn.Any
984    :members: __init__
985
986 Constructed types
987 -----------------
988
989 Sequence
990 ________
991 .. autoclass:: pyderasn.Sequence
992    :members: __init__
993
994 Set
995 ___
996 .. autoclass:: pyderasn.Set
997    :members: __init__
998
999 SequenceOf
1000 __________
1001 .. autoclass:: pyderasn.SequenceOf
1002    :members: __init__
1003
1004 SetOf
1005 _____
1006 .. autoclass:: pyderasn.SetOf
1007    :members: __init__
1008
1009 Various
1010 -------
1011
1012 .. autofunction:: pyderasn.abs_decode_path
1013 .. autofunction:: pyderasn.agg_octet_string
1014 .. autofunction:: pyderasn.colonize_hex
1015 .. autofunction:: pyderasn.encode2pass
1016 .. autofunction:: pyderasn.encode_cer
1017 .. autofunction:: pyderasn.file_mmaped
1018 .. autofunction:: pyderasn.hexenc
1019 .. autofunction:: pyderasn.hexdec
1020 .. autofunction:: pyderasn.tag_encode
1021 .. autofunction:: pyderasn.tag_decode
1022 .. autofunction:: pyderasn.tag_ctxp
1023 .. autofunction:: pyderasn.tag_ctxc
1024 .. autoclass:: pyderasn.DecodeError
1025    :members: __init__
1026 .. autoclass:: pyderasn.NotEnoughData
1027 .. autoclass:: pyderasn.ExceedingData
1028 .. autoclass:: pyderasn.LenIndefForm
1029 .. autoclass:: pyderasn.TagMismatch
1030 .. autoclass:: pyderasn.InvalidLength
1031 .. autoclass:: pyderasn.InvalidOID
1032 .. autoclass:: pyderasn.ObjUnknown
1033 .. autoclass:: pyderasn.ObjNotReady
1034 .. autoclass:: pyderasn.InvalidValueType
1035 .. autoclass:: pyderasn.BoundsError
1036
1037 .. _cmdline:
1038
1039 Command-line usage
1040 ------------------
1041
1042 You can decode DER/BER files using command line abilities::
1043
1044     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1045
1046 If there is no schema for your file, then you can try parsing it without,
1047 but of course IMPLICIT tags will often make it impossible. But result is
1048 good enough for the certificate above::
1049
1050     $ python -m pyderasn path/to/file
1051         0   [1,3,1604]  . >: SEQUENCE OF
1052         4   [1,3,1453]  . . >: SEQUENCE OF
1053         8   [0,0,   5]  . . . . >: [0] ANY
1054                         . . . . . A0:03:02:01:02
1055        13   [1,1,   3]  . . . . >: INTEGER 61595
1056        18   [1,1,  13]  . . . . >: SEQUENCE OF
1057        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1058        31   [1,1,   0]  . . . . . . >: NULL
1059        33   [1,3, 274]  . . . . >: SEQUENCE OF
1060        37   [1,1,  11]  . . . . . . >: SET OF
1061        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1062        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1063        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1064     [...]
1065      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1066      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1067      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1068                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1069                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1070                         . . . . . . . . . 61:2E:63:6F:6D:2F
1071      1461   [1,1,  13]  . . >: SEQUENCE OF
1072      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1073      1474   [1,1,   0]  . . . . >: NULL
1074      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1075                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1076                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1077     [...]
1078
1079 Human readable OIDs
1080 ___________________
1081
1082 If you have got dictionaries with ObjectIdentifiers, like example one
1083 from ``tests/test_crts.py``::
1084
1085     stroid2name = {
1086         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1087         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1088         [...]
1089         "2.5.4.10": "id-at-organizationName",
1090         "2.5.4.11": "id-at-organizationalUnitName",
1091     }
1092
1093 then you can pass it to pretty printer to see human readable OIDs::
1094
1095     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1096     [...]
1097        37   [1,1,  11]  . . . . . . >: SET OF
1098        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1099        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1100        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1101        50   [1,1,  18]  . . . . . . >: SET OF
1102        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1103        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1104        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1105        70   [1,1,  18]  . . . . . . >: SET OF
1106        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1107        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1108        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1109     [...]
1110
1111 Decode paths
1112 ____________
1113
1114 Each decoded element has so-called decode path: sequence of structure
1115 names it is passing during the decode process. Each element has its own
1116 unique path inside the whole ASN.1 tree. You can print it out with
1117 ``--print-decode-path`` option::
1118
1119     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1120        0    [1,3,1604]  Certificate SEQUENCE []
1121        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1122       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1123       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1124       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1125       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1126       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1127                          . . . . 05:00
1128       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1129       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1130       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1131       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1132       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1133       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1134                          . . . . . . . 13:02:45:53
1135       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1136     [...]
1137
1138 Now you can print only the specified tree, for example signature algorithm::
1139
1140     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1141       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1142       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1143       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1144                          . . 05:00
1145 """
1146
1147 from array import array
1148 from codecs import getdecoder
1149 from codecs import getencoder
1150 from collections import namedtuple
1151 from collections import OrderedDict
1152 from copy import copy
1153 from datetime import datetime
1154 from datetime import timedelta
1155 from io import BytesIO
1156 from math import ceil
1157 from mmap import mmap
1158 from mmap import PROT_READ
1159 from operator import attrgetter
1160 from string import ascii_letters
1161 from string import digits
1162 from sys import maxsize as sys_maxsize
1163 from sys import version_info
1164 from unicodedata import category as unicat
1165
1166 from six import add_metaclass
1167 from six import binary_type
1168 from six import byte2int
1169 from six import indexbytes
1170 from six import int2byte
1171 from six import integer_types
1172 from six import iterbytes
1173 from six import iteritems
1174 from six import itervalues
1175 from six import PY2
1176 from six import string_types
1177 from six import text_type
1178 from six import unichr as six_unichr
1179 from six.moves import xrange as six_xrange
1180
1181
1182 try:
1183     from termcolor import colored
1184 except ImportError:  # pragma: no cover
1185     def colored(what, *args, **kwargs):
1186         return what
1187
1188 __version__ = "7.3"
1189
1190 __all__ = (
1191     "agg_octet_string",
1192     "Any",
1193     "BitString",
1194     "BMPString",
1195     "Boolean",
1196     "BoundsError",
1197     "Choice",
1198     "colonize_hex",
1199     "DecodeError",
1200     "DecodePathDefBy",
1201     "encode2pass",
1202     "encode_cer",
1203     "Enumerated",
1204     "ExceedingData",
1205     "file_mmaped",
1206     "GeneralizedTime",
1207     "GeneralString",
1208     "GraphicString",
1209     "hexdec",
1210     "hexenc",
1211     "IA5String",
1212     "Integer",
1213     "InvalidLength",
1214     "InvalidOID",
1215     "InvalidValueType",
1216     "ISO646String",
1217     "LenIndefForm",
1218     "NotEnoughData",
1219     "Null",
1220     "NumericString",
1221     "obj_by_path",
1222     "ObjectIdentifier",
1223     "ObjNotReady",
1224     "ObjUnknown",
1225     "OctetString",
1226     "PrimitiveTypes",
1227     "PrintableString",
1228     "Sequence",
1229     "SequenceOf",
1230     "Set",
1231     "SetOf",
1232     "T61String",
1233     "tag_ctxc",
1234     "tag_ctxp",
1235     "tag_decode",
1236     "TagClassApplication",
1237     "TagClassContext",
1238     "TagClassPrivate",
1239     "TagClassUniversal",
1240     "TagFormConstructed",
1241     "TagFormPrimitive",
1242     "TagMismatch",
1243     "TeletexString",
1244     "UniversalString",
1245     "UTCTime",
1246     "UTF8String",
1247     "VideotexString",
1248     "VisibleString",
1249 )
1250
1251 TagClassUniversal = 0
1252 TagClassApplication = 1 << 6
1253 TagClassContext = 1 << 7
1254 TagClassPrivate = 1 << 6 | 1 << 7
1255 TagFormPrimitive = 0
1256 TagFormConstructed = 1 << 5
1257 TagClassReprs = {
1258     TagClassContext: "",
1259     TagClassApplication: "APPLICATION ",
1260     TagClassPrivate: "PRIVATE ",
1261     TagClassUniversal: "UNIV ",
1262 }
1263 EOC = b"\x00\x00"
1264 EOC_LEN = len(EOC)
1265 LENINDEF = b"\x80"  # length indefinite mark
1266 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1267 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1268 SET01 = frozenset("01")
1269 DECIMALS = frozenset(digits)
1270 DECIMAL_SIGNS = ".,"
1271 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1272
1273
1274 def file_mmaped(fd):
1275     """Make mmap-ed memoryview for reading from file
1276
1277     :param fd: file object
1278     :returns: memoryview over read-only mmap-ing of the whole file
1279     """
1280     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1281
1282 def pureint(value):
1283     if not set(value) <= DECIMALS:
1284         raise ValueError("non-pure integer")
1285     return int(value)
1286
1287 def fractions2float(fractions_raw):
1288     pureint(fractions_raw)
1289     return float("0." + fractions_raw)
1290
1291
1292 def get_def_by_path(defines_by_path, sub_decode_path):
1293     """Get define by decode path
1294     """
1295     for path, define in defines_by_path:
1296         if len(path) != len(sub_decode_path):
1297             continue
1298         for p1, p2 in zip(path, sub_decode_path):
1299             if (not p1 is any) and (p1 != p2):
1300                 break
1301         else:
1302             return define
1303
1304
1305 ########################################################################
1306 # Errors
1307 ########################################################################
1308
1309 class ASN1Error(ValueError):
1310     pass
1311
1312
1313 class DecodeError(ASN1Error):
1314     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1315         """
1316         :param str msg: reason of decode failing
1317         :param klass: optional exact DecodeError inherited class (like
1318                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1319                       :py:exc:`InvalidLength`)
1320         :param decode_path: tuple of strings. It contains human
1321                             readable names of the fields through which
1322                             decoding process has passed
1323         :param int offset: binary offset where failure happened
1324         """
1325         super(DecodeError, self).__init__()
1326         self.msg = msg
1327         self.klass = klass
1328         self.decode_path = decode_path
1329         self.offset = offset
1330
1331     def __str__(self):
1332         return " ".join(
1333             c for c in (
1334                 "" if self.klass is None else self.klass.__name__,
1335                 (
1336                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1337                     if len(self.decode_path) > 0 else ""
1338                 ),
1339                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1340                 self.msg,
1341             ) if c != ""
1342         )
1343
1344     def __repr__(self):
1345         return "%s(%s)" % (self.__class__.__name__, self)
1346
1347
1348 class NotEnoughData(DecodeError):
1349     pass
1350
1351
1352 class ExceedingData(ASN1Error):
1353     def __init__(self, nbytes):
1354         super(ExceedingData, self).__init__()
1355         self.nbytes = nbytes
1356
1357     def __str__(self):
1358         return "%d trailing bytes" % self.nbytes
1359
1360     def __repr__(self):
1361         return "%s(%s)" % (self.__class__.__name__, self)
1362
1363
1364 class LenIndefForm(DecodeError):
1365     pass
1366
1367
1368 class TagMismatch(DecodeError):
1369     pass
1370
1371
1372 class InvalidLength(DecodeError):
1373     pass
1374
1375
1376 class InvalidOID(DecodeError):
1377     pass
1378
1379
1380 class ObjUnknown(ASN1Error):
1381     def __init__(self, name):
1382         super(ObjUnknown, self).__init__()
1383         self.name = name
1384
1385     def __str__(self):
1386         return "object is unknown: %s" % self.name
1387
1388     def __repr__(self):
1389         return "%s(%s)" % (self.__class__.__name__, self)
1390
1391
1392 class ObjNotReady(ASN1Error):
1393     def __init__(self, name):
1394         super(ObjNotReady, self).__init__()
1395         self.name = name
1396
1397     def __str__(self):
1398         return "object is not ready: %s" % self.name
1399
1400     def __repr__(self):
1401         return "%s(%s)" % (self.__class__.__name__, self)
1402
1403
1404 class InvalidValueType(ASN1Error):
1405     def __init__(self, expected_types):
1406         super(InvalidValueType, self).__init__()
1407         self.expected_types = expected_types
1408
1409     def __str__(self):
1410         return "invalid value type, expected: %s" % ", ".join(
1411             [repr(t) for t in self.expected_types]
1412         )
1413
1414     def __repr__(self):
1415         return "%s(%s)" % (self.__class__.__name__, self)
1416
1417
1418 class BoundsError(ASN1Error):
1419     def __init__(self, bound_min, value, bound_max):
1420         super(BoundsError, self).__init__()
1421         self.bound_min = bound_min
1422         self.value = value
1423         self.bound_max = bound_max
1424
1425     def __str__(self):
1426         return "unsatisfied bounds: %s <= %s <= %s" % (
1427             self.bound_min,
1428             self.value,
1429             self.bound_max,
1430         )
1431
1432     def __repr__(self):
1433         return "%s(%s)" % (self.__class__.__name__, self)
1434
1435
1436 ########################################################################
1437 # Basic coders
1438 ########################################################################
1439
1440 _hexdecoder = getdecoder("hex")
1441 _hexencoder = getencoder("hex")
1442
1443
1444 def hexdec(data):
1445     """Binary data to hexadecimal string convert
1446     """
1447     return _hexdecoder(data)[0]
1448
1449
1450 def hexenc(data):
1451     """Hexadecimal string to binary data convert
1452     """
1453     return _hexencoder(data)[0].decode("ascii")
1454
1455
1456 def int_bytes_len(num, byte_len=8):
1457     if num == 0:
1458         return 1
1459     return int(ceil(float(num.bit_length()) / byte_len))
1460
1461
1462 def zero_ended_encode(num):
1463     octets = bytearray(int_bytes_len(num, 7))
1464     i = len(octets) - 1
1465     octets[i] = num & 0x7F
1466     num >>= 7
1467     i -= 1
1468     while num > 0:
1469         octets[i] = 0x80 | (num & 0x7F)
1470         num >>= 7
1471         i -= 1
1472     return bytes(octets)
1473
1474
1475 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1476     """Encode tag to binary form
1477
1478     :param int num: tag's number
1479     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1480                       :py:data:`pyderasn.TagClassContext`,
1481                       :py:data:`pyderasn.TagClassApplication`,
1482                       :py:data:`pyderasn.TagClassPrivate`)
1483     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1484                      :py:data:`pyderasn.TagFormConstructed`)
1485     """
1486     if num < 31:
1487         # [XX|X|.....]
1488         return int2byte(klass | form | num)
1489     # [XX|X|11111][1.......][1.......] ... [0.......]
1490     return int2byte(klass | form | 31) + zero_ended_encode(num)
1491
1492
1493 def tag_decode(tag):
1494     """Decode tag from binary form
1495
1496     .. warning::
1497
1498        No validation is performed, assuming that it has already passed.
1499
1500     It returns tuple with three integers, as
1501     :py:func:`pyderasn.tag_encode` accepts.
1502     """
1503     first_octet = byte2int(tag)
1504     klass = first_octet & 0xC0
1505     form = first_octet & 0x20
1506     if first_octet & 0x1F < 0x1F:
1507         return (klass, form, first_octet & 0x1F)
1508     num = 0
1509     for octet in iterbytes(tag[1:]):
1510         num <<= 7
1511         num |= octet & 0x7F
1512     return (klass, form, num)
1513
1514
1515 def tag_ctxp(num):
1516     """Create CONTEXT PRIMITIVE tag
1517     """
1518     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1519
1520
1521 def tag_ctxc(num):
1522     """Create CONTEXT CONSTRUCTED tag
1523     """
1524     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1525
1526
1527 def tag_strip(data):
1528     """Take off tag from the data
1529
1530     :returns: (encoded tag, tag length, remaining data)
1531     """
1532     if len(data) == 0:
1533         raise NotEnoughData("no data at all")
1534     if byte2int(data) & 0x1F < 31:
1535         return data[:1], 1, data[1:]
1536     i = 0
1537     while True:
1538         i += 1
1539         if i == len(data):
1540             raise DecodeError("unfinished tag")
1541         if indexbytes(data, i) & 0x80 == 0:
1542             break
1543     i += 1
1544     return data[:i], i, data[i:]
1545
1546
1547 def len_encode(l):
1548     if l < 0x80:
1549         return int2byte(l)
1550     octets = bytearray(int_bytes_len(l) + 1)
1551     octets[0] = 0x80 | (len(octets) - 1)
1552     for i in six_xrange(len(octets) - 1, 0, -1):
1553         octets[i] = l & 0xFF
1554         l >>= 8
1555     return bytes(octets)
1556
1557
1558 def len_decode(data):
1559     """Decode length
1560
1561     :returns: (decoded length, length's length, remaining data)
1562     :raises LenIndefForm: if indefinite form encoding is met
1563     """
1564     if len(data) == 0:
1565         raise NotEnoughData("no data at all")
1566     first_octet = byte2int(data)
1567     if first_octet & 0x80 == 0:
1568         return first_octet, 1, data[1:]
1569     octets_num = first_octet & 0x7F
1570     if octets_num + 1 > len(data):
1571         raise NotEnoughData("encoded length is longer than data")
1572     if octets_num == 0:
1573         raise LenIndefForm()
1574     if byte2int(data[1:]) == 0:
1575         raise DecodeError("leading zeros")
1576     l = 0
1577     for v in iterbytes(data[1:1 + octets_num]):
1578         l = (l << 8) | v
1579     if l <= 127:
1580         raise DecodeError("long form instead of short one")
1581     return l, 1 + octets_num, data[1 + octets_num:]
1582
1583
1584 LEN0 = len_encode(0)
1585 LEN1 = len_encode(1)
1586 LEN1K = len_encode(1000)
1587
1588
1589 def len_size(l):
1590     """How many bytes length field will take
1591     """
1592     if l < 128:
1593         return 1
1594     if l < 256:  # 1 << 8
1595         return 2
1596     if l < 65536:  # 1 << 16
1597         return 3
1598     if l < 16777216:  # 1 << 24
1599         return 4
1600     if l < 4294967296:  # 1 << 32
1601         return 5
1602     if l < 1099511627776:  # 1 << 40
1603         return 6
1604     if l < 281474976710656:  # 1 << 48
1605         return 7
1606     if l < 72057594037927936:  # 1 << 56
1607         return 8
1608     raise OverflowError("too big length")
1609
1610
1611 def write_full(writer, data):
1612     """Fully write provided data
1613
1614     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1615
1616     BytesIO does not guarantee that the whole data will be written at
1617     once. That function write everything provided, raising an error if
1618     ``writer`` returns None.
1619     """
1620     data = memoryview(data)
1621     written = 0
1622     while written != len(data):
1623         n = writer(data[written:])
1624         if n is None:
1625             raise ValueError("can not write to buf")
1626         written += n
1627
1628
1629 # If it is 64-bit system, then use compact 64-bit array of unsigned
1630 # longs. Use an ordinary list with universal integers otherwise, that
1631 # is slower.
1632 if sys_maxsize > 2 ** 32:
1633     def state_2pass_new():
1634         return array("L")
1635 else:
1636     def state_2pass_new():
1637         return []
1638
1639
1640 ########################################################################
1641 # Base class
1642 ########################################################################
1643
1644 class AutoAddSlots(type):
1645     def __new__(cls, name, bases, _dict):
1646         _dict["__slots__"] = _dict.get("__slots__", ())
1647         return type.__new__(cls, name, bases, _dict)
1648
1649
1650 BasicState = namedtuple("BasicState", (
1651     "version",
1652     "tag",
1653     "tag_order",
1654     "expl",
1655     "default",
1656     "optional",
1657     "offset",
1658     "llen",
1659     "vlen",
1660     "expl_lenindef",
1661     "lenindef",
1662     "ber_encoded",
1663 ), **NAMEDTUPLE_KWARGS)
1664
1665
1666 @add_metaclass(AutoAddSlots)
1667 class Obj(object):
1668     """Common ASN.1 object class
1669
1670     All ASN.1 types are inherited from it. It has metaclass that
1671     automatically adds ``__slots__`` to all inherited classes.
1672     """
1673     __slots__ = (
1674         "tag",
1675         "_tag_order",
1676         "_value",
1677         "_expl",
1678         "default",
1679         "optional",
1680         "offset",
1681         "llen",
1682         "vlen",
1683         "expl_lenindef",
1684         "lenindef",
1685         "ber_encoded",
1686     )
1687
1688     def __init__(
1689             self,
1690             impl=None,
1691             expl=None,
1692             default=None,
1693             optional=False,
1694             _decoded=(0, 0, 0),
1695     ):
1696         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1697         self._expl = getattr(self, "expl", None) if expl is None else expl
1698         if self.tag != self.tag_default and self._expl is not None:
1699             raise ValueError("implicit and explicit tags can not be set simultaneously")
1700         if self.tag is None:
1701             self._tag_order = None
1702         else:
1703             tag_class, _, tag_num = tag_decode(
1704                 self.tag if self._expl is None else self._expl
1705             )
1706             self._tag_order = (tag_class, tag_num)
1707         if default is not None:
1708             optional = True
1709         self.optional = optional
1710         self.offset, self.llen, self.vlen = _decoded
1711         self.default = None
1712         self.expl_lenindef = False
1713         self.lenindef = False
1714         self.ber_encoded = False
1715
1716     @property
1717     def ready(self):  # pragma: no cover
1718         """Is object ready to be encoded?
1719         """
1720         raise NotImplementedError()
1721
1722     def _assert_ready(self):
1723         if not self.ready:
1724             raise ObjNotReady(self.__class__.__name__)
1725
1726     @property
1727     def bered(self):
1728         """Is either object or any elements inside is BER encoded?
1729         """
1730         return self.expl_lenindef or self.lenindef or self.ber_encoded
1731
1732     @property
1733     def decoded(self):
1734         """Is object decoded?
1735         """
1736         return (self.llen + self.vlen) > 0
1737
1738     def __getstate__(self):  # pragma: no cover
1739         """Used for making safe to be mutable pickleable copies
1740         """
1741         raise NotImplementedError()
1742
1743     def __setstate__(self, state):
1744         if state.version != __version__:
1745             raise ValueError("data is pickled by different PyDERASN version")
1746         self.tag = state.tag
1747         self._tag_order = state.tag_order
1748         self._expl = state.expl
1749         self.default = state.default
1750         self.optional = state.optional
1751         self.offset = state.offset
1752         self.llen = state.llen
1753         self.vlen = state.vlen
1754         self.expl_lenindef = state.expl_lenindef
1755         self.lenindef = state.lenindef
1756         self.ber_encoded = state.ber_encoded
1757
1758     @property
1759     def tag_order(self):
1760         """Tag's (class, number) used for DER/CER sorting
1761         """
1762         return self._tag_order
1763
1764     @property
1765     def tag_order_cer(self):
1766         return self.tag_order
1767
1768     @property
1769     def tlen(self):
1770         """.. seealso:: :ref:`decoding`
1771         """
1772         return len(self.tag)
1773
1774     @property
1775     def tlvlen(self):
1776         """.. seealso:: :ref:`decoding`
1777         """
1778         return self.tlen + self.llen + self.vlen
1779
1780     def __str__(self):  # pragma: no cover
1781         return self.__bytes__() if PY2 else self.__unicode__()
1782
1783     def __ne__(self, their):
1784         return not(self == their)
1785
1786     def __gt__(self, their):  # pragma: no cover
1787         return not(self < their)
1788
1789     def __le__(self, their):  # pragma: no cover
1790         return (self == their) or (self < their)
1791
1792     def __ge__(self, their):  # pragma: no cover
1793         return (self == their) or (self > their)
1794
1795     def _encode(self):  # pragma: no cover
1796         raise NotImplementedError()
1797
1798     def _encode_cer(self, writer):
1799         write_full(writer, self._encode())
1800
1801     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1802         yield NotImplemented
1803
1804     def _encode1st(self, state):
1805         raise NotImplementedError()
1806
1807     def _encode2nd(self, writer, state_iter):
1808         raise NotImplementedError()
1809
1810     def encode(self):
1811         """DER encode the structure
1812
1813         :returns: DER representation
1814         """
1815         raw = self._encode()
1816         if self._expl is None:
1817             return raw
1818         return b"".join((self._expl, len_encode(len(raw)), raw))
1819
1820     def encode1st(self, state=None):
1821         """Do the 1st pass of 2-pass encoding
1822
1823         :rtype: (int, array("L"))
1824         :returns: full length of encoded data and precalculated various
1825                   objects lengths
1826         """
1827         if state is None:
1828             state = state_2pass_new()
1829         if self._expl is None:
1830             return self._encode1st(state)
1831         state.append(0)
1832         idx = len(state) - 1
1833         vlen, _ = self._encode1st(state)
1834         state[idx] = vlen
1835         fulllen = len(self._expl) + len_size(vlen) + vlen
1836         return fulllen, state
1837
1838     def encode2nd(self, writer, state_iter):
1839         """Do the 2nd pass of 2-pass encoding
1840
1841         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1842         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1843         """
1844         if self._expl is None:
1845             self._encode2nd(writer, state_iter)
1846         else:
1847             write_full(writer, self._expl + len_encode(next(state_iter)))
1848             self._encode2nd(writer, state_iter)
1849
1850     def encode_cer(self, writer):
1851         """CER encode the structure to specified writer
1852
1853         :param writer: must comply with ``io.RawIOBase.write``
1854                        behaviour. It takes slice to be written and
1855                        returns number of bytes processed. If it returns
1856                        None, then exception will be raised
1857         """
1858         if self._expl is not None:
1859             write_full(writer, self._expl + LENINDEF)
1860         if getattr(self, "der_forced", False):
1861             write_full(writer, self._encode())
1862         else:
1863             self._encode_cer(writer)
1864         if self._expl is not None:
1865             write_full(writer, EOC)
1866
1867     def hexencode(self):
1868         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1869         """
1870         return hexenc(self.encode())
1871
1872     def decode(
1873             self,
1874             data,
1875             offset=0,
1876             leavemm=False,
1877             decode_path=(),
1878             ctx=None,
1879             tag_only=False,
1880             _ctx_immutable=True,
1881     ):
1882         """Decode the data
1883
1884         :param data: either binary or memoryview
1885         :param int offset: initial data's offset
1886         :param bool leavemm: do we need to leave memoryview of remaining
1887                     data as is, or convert it to bytes otherwise
1888         :param decode_path: current decode path (tuples of strings,
1889                             possibly with DecodePathDefBy) with will be
1890                             the root for all underlying objects
1891         :param ctx: optional :ref:`context <ctx>` governing decoding process
1892         :param bool tag_only: decode only the tag, without length and
1893                               contents (used only in Choice and Set
1894                               structures, trying to determine if tag satisfies
1895                               the schema)
1896         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1897                                     before using it?
1898         :returns: (Obj, remaining data)
1899
1900         .. seealso:: :ref:`decoding`
1901         """
1902         result = next(self.decode_evgen(
1903             data,
1904             offset,
1905             leavemm,
1906             decode_path,
1907             ctx,
1908             tag_only,
1909             _ctx_immutable,
1910             _evgen_mode=False,
1911         ))
1912         if result is None:
1913             return None
1914         _, obj, tail = result
1915         return obj, tail
1916
1917     def decode_evgen(
1918             self,
1919             data,
1920             offset=0,
1921             leavemm=False,
1922             decode_path=(),
1923             ctx=None,
1924             tag_only=False,
1925             _ctx_immutable=True,
1926             _evgen_mode=True,
1927     ):
1928         """Decode with evgen mode on
1929
1930         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1931         it returns the generator producing ``(decode_path, obj, tail)``
1932         values.
1933         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1934         """
1935         if ctx is None:
1936             ctx = {}
1937         elif _ctx_immutable:
1938             ctx = copy(ctx)
1939         tlv = memoryview(data)
1940         if (
1941                 _evgen_mode and
1942                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1943         ):
1944             _evgen_mode = False
1945         if self._expl is None:
1946             for result in self._decode(
1947                     tlv,
1948                     offset=offset,
1949                     decode_path=decode_path,
1950                     ctx=ctx,
1951                     tag_only=tag_only,
1952                     evgen_mode=_evgen_mode,
1953             ):
1954                 if tag_only:
1955                     yield None
1956                     return
1957                 _decode_path, obj, tail = result
1958                 if not _decode_path is decode_path:
1959                     yield result
1960         else:
1961             try:
1962                 t, tlen, lv = tag_strip(tlv)
1963             except DecodeError as err:
1964                 raise err.__class__(
1965                     msg=err.msg,
1966                     klass=self.__class__,
1967                     decode_path=decode_path,
1968                     offset=offset,
1969                 )
1970             if t != self._expl:
1971                 raise TagMismatch(
1972                     klass=self.__class__,
1973                     decode_path=decode_path,
1974                     offset=offset,
1975                 )
1976             try:
1977                 l, llen, v = len_decode(lv)
1978             except LenIndefForm as err:
1979                 if not ctx.get("bered", False):
1980                     raise err.__class__(
1981                         msg=err.msg,
1982                         klass=self.__class__,
1983                         decode_path=decode_path,
1984                         offset=offset,
1985                     )
1986                 llen, v = 1, lv[1:]
1987                 offset += tlen + llen
1988                 for result in self._decode(
1989                         v,
1990                         offset=offset,
1991                         decode_path=decode_path,
1992                         ctx=ctx,
1993                         tag_only=tag_only,
1994                         evgen_mode=_evgen_mode,
1995                 ):
1996                     if tag_only:  # pragma: no cover
1997                         yield None
1998                         return
1999                     _decode_path, obj, tail = result
2000                     if not _decode_path is decode_path:
2001                         yield result
2002                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2003                 if eoc_expected.tobytes() != EOC:
2004                     raise DecodeError(
2005                         "no EOC",
2006                         klass=self.__class__,
2007                         decode_path=decode_path,
2008                         offset=offset,
2009                     )
2010                 obj.vlen += EOC_LEN
2011                 obj.expl_lenindef = True
2012             except DecodeError as err:
2013                 raise err.__class__(
2014                     msg=err.msg,
2015                     klass=self.__class__,
2016                     decode_path=decode_path,
2017                     offset=offset,
2018                 )
2019             else:
2020                 if l > len(v):
2021                     raise NotEnoughData(
2022                         "encoded length is longer than data",
2023                         klass=self.__class__,
2024                         decode_path=decode_path,
2025                         offset=offset,
2026                     )
2027                 for result in self._decode(
2028                         v,
2029                         offset=offset + tlen + llen,
2030                         decode_path=decode_path,
2031                         ctx=ctx,
2032                         tag_only=tag_only,
2033                         evgen_mode=_evgen_mode,
2034                 ):
2035                     if tag_only:  # pragma: no cover
2036                         yield None
2037                         return
2038                     _decode_path, obj, tail = result
2039                     if not _decode_path is decode_path:
2040                         yield result
2041                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2042                     raise DecodeError(
2043                         "explicit tag out-of-bound, longer than data",
2044                         klass=self.__class__,
2045                         decode_path=decode_path,
2046                         offset=offset,
2047                     )
2048         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2049
2050     def decod(self, data, offset=0, decode_path=(), ctx=None):
2051         """Decode the data, check that tail is empty
2052
2053         :raises ExceedingData: if tail is not empty
2054
2055         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2056         (decode without tail) that also checks that there is no
2057         trailing data left.
2058         """
2059         obj, tail = self.decode(
2060             data,
2061             offset=offset,
2062             decode_path=decode_path,
2063             ctx=ctx,
2064             leavemm=True,
2065         )
2066         if len(tail) > 0:
2067             raise ExceedingData(len(tail))
2068         return obj
2069
2070     def hexdecode(self, data, *args, **kwargs):
2071         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2072         """
2073         return self.decode(hexdec(data), *args, **kwargs)
2074
2075     def hexdecod(self, data, *args, **kwargs):
2076         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2077         """
2078         return self.decod(hexdec(data), *args, **kwargs)
2079
2080     @property
2081     def expled(self):
2082         """.. seealso:: :ref:`decoding`
2083         """
2084         return self._expl is not None
2085
2086     @property
2087     def expl_tag(self):
2088         """.. seealso:: :ref:`decoding`
2089         """
2090         return self._expl
2091
2092     @property
2093     def expl_tlen(self):
2094         """.. seealso:: :ref:`decoding`
2095         """
2096         return len(self._expl)
2097
2098     @property
2099     def expl_llen(self):
2100         """.. seealso:: :ref:`decoding`
2101         """
2102         if self.expl_lenindef:
2103             return 1
2104         return len(len_encode(self.tlvlen))
2105
2106     @property
2107     def expl_offset(self):
2108         """.. seealso:: :ref:`decoding`
2109         """
2110         return self.offset - self.expl_tlen - self.expl_llen
2111
2112     @property
2113     def expl_vlen(self):
2114         """.. seealso:: :ref:`decoding`
2115         """
2116         return self.tlvlen
2117
2118     @property
2119     def expl_tlvlen(self):
2120         """.. seealso:: :ref:`decoding`
2121         """
2122         return self.expl_tlen + self.expl_llen + self.expl_vlen
2123
2124     @property
2125     def fulloffset(self):
2126         """.. seealso:: :ref:`decoding`
2127         """
2128         return self.expl_offset if self.expled else self.offset
2129
2130     @property
2131     def fulllen(self):
2132         """.. seealso:: :ref:`decoding`
2133         """
2134         return self.expl_tlvlen if self.expled else self.tlvlen
2135
2136     def pps_lenindef(self, decode_path):
2137         if self.lenindef and not (
2138                 getattr(self, "defined", None) is not None and
2139                 self.defined[1].lenindef
2140         ):
2141             yield _pp(
2142                 asn1_type_name="EOC",
2143                 obj_name="",
2144                 decode_path=decode_path,
2145                 offset=(
2146                     self.offset + self.tlvlen -
2147                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2148                 ),
2149                 tlen=1,
2150                 llen=1,
2151                 vlen=0,
2152                 ber_encoded=True,
2153                 bered=True,
2154             )
2155         if self.expl_lenindef:
2156             yield _pp(
2157                 asn1_type_name="EOC",
2158                 obj_name="EXPLICIT",
2159                 decode_path=decode_path,
2160                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2161                 tlen=1,
2162                 llen=1,
2163                 vlen=0,
2164                 ber_encoded=True,
2165                 bered=True,
2166             )
2167
2168
2169 def encode_cer(obj):
2170     """Encode to CER in memory buffer
2171
2172     :returns bytes: memory buffer contents
2173     """
2174     buf = BytesIO()
2175     obj.encode_cer(buf.write)
2176     return buf.getvalue()
2177
2178
2179 def encode2pass(obj):
2180     """Encode (2-pass mode) to DER in memory buffer
2181
2182     :returns bytes: memory buffer contents
2183     """
2184     buf = BytesIO()
2185     _, state = obj.encode1st()
2186     obj.encode2nd(buf.write, iter(state))
2187     return buf.getvalue()
2188
2189
2190 class DecodePathDefBy(object):
2191     """DEFINED BY representation inside decode path
2192     """
2193     __slots__ = ("defined_by",)
2194
2195     def __init__(self, defined_by):
2196         self.defined_by = defined_by
2197
2198     def __ne__(self, their):
2199         return not(self == their)
2200
2201     def __eq__(self, their):
2202         if not isinstance(their, self.__class__):
2203             return False
2204         return self.defined_by == their.defined_by
2205
2206     def __str__(self):
2207         return "DEFINED BY " + str(self.defined_by)
2208
2209     def __repr__(self):
2210         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2211
2212
2213 ########################################################################
2214 # Pretty printing
2215 ########################################################################
2216
2217 PP = namedtuple("PP", (
2218     "obj",
2219     "asn1_type_name",
2220     "obj_name",
2221     "decode_path",
2222     "value",
2223     "blob",
2224     "optional",
2225     "default",
2226     "impl",
2227     "expl",
2228     "offset",
2229     "tlen",
2230     "llen",
2231     "vlen",
2232     "expl_offset",
2233     "expl_tlen",
2234     "expl_llen",
2235     "expl_vlen",
2236     "expl_lenindef",
2237     "lenindef",
2238     "ber_encoded",
2239     "bered",
2240 ), **NAMEDTUPLE_KWARGS)
2241
2242
2243 def _pp(
2244         obj=None,
2245         asn1_type_name="unknown",
2246         obj_name="unknown",
2247         decode_path=(),
2248         value=None,
2249         blob=None,
2250         optional=False,
2251         default=False,
2252         impl=None,
2253         expl=None,
2254         offset=0,
2255         tlen=0,
2256         llen=0,
2257         vlen=0,
2258         expl_offset=None,
2259         expl_tlen=None,
2260         expl_llen=None,
2261         expl_vlen=None,
2262         expl_lenindef=False,
2263         lenindef=False,
2264         ber_encoded=False,
2265         bered=False,
2266 ):
2267     return PP(
2268         obj,
2269         asn1_type_name,
2270         obj_name,
2271         decode_path,
2272         value,
2273         blob,
2274         optional,
2275         default,
2276         impl,
2277         expl,
2278         offset,
2279         tlen,
2280         llen,
2281         vlen,
2282         expl_offset,
2283         expl_tlen,
2284         expl_llen,
2285         expl_vlen,
2286         expl_lenindef,
2287         lenindef,
2288         ber_encoded,
2289         bered,
2290     )
2291
2292
2293 def _colourize(what, colour, with_colours, attrs=("bold",)):
2294     return colored(what, colour, attrs=attrs) if with_colours else what
2295
2296
2297 def colonize_hex(hexed):
2298     """Separate hexadecimal string with colons
2299     """
2300     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2301
2302
2303 def pp_console_row(
2304         pp,
2305         oid_maps=(),
2306         with_offsets=False,
2307         with_blob=True,
2308         with_colours=False,
2309         with_decode_path=False,
2310         decode_path_len_decrease=0,
2311 ):
2312     cols = []
2313     if with_offsets:
2314         col = "%5d%s%s" % (
2315             pp.offset,
2316             (
2317                 "  " if pp.expl_offset is None else
2318                 ("-%d" % (pp.offset - pp.expl_offset))
2319             ),
2320             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2321         )
2322         col = _colourize(col, "red", with_colours, ())
2323         col += _colourize("B", "red", with_colours) if pp.bered else " "
2324         cols.append(col)
2325         col = "[%d,%d,%4d]%s" % (
2326             pp.tlen,
2327             pp.llen,
2328             pp.vlen,
2329             LENINDEF_PP_CHAR if pp.lenindef else " "
2330         )
2331         col = _colourize(col, "green", with_colours, ())
2332         cols.append(col)
2333     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2334     if decode_path_len > 0:
2335         cols.append(" ." * decode_path_len)
2336         ent = pp.decode_path[-1]
2337         if isinstance(ent, DecodePathDefBy):
2338             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2339             value = str(ent.defined_by)
2340             oid_name = None
2341             if (
2342                     len(oid_maps) > 0 and
2343                     ent.defined_by.asn1_type_name ==
2344                     ObjectIdentifier.asn1_type_name
2345             ):
2346                 for oid_map in oid_maps:
2347                     oid_name = oid_map.get(value)
2348                     if oid_name is not None:
2349                         cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2350                         break
2351             if oid_name is None:
2352                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2353         else:
2354             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2355     if pp.expl is not None:
2356         klass, _, num = pp.expl
2357         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2358         cols.append(_colourize(col, "blue", with_colours))
2359     if pp.impl is not None:
2360         klass, _, num = pp.impl
2361         col = "[%s%d]" % (TagClassReprs[klass], num)
2362         cols.append(_colourize(col, "blue", with_colours))
2363     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2364         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2365     if pp.ber_encoded:
2366         cols.append(_colourize("BER", "red", with_colours))
2367     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2368     if pp.value is not None:
2369         value = pp.value
2370         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2371         if (
2372                 len(oid_maps) > 0 and
2373                 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2374         ):
2375             for oid_map in oid_maps:
2376                 oid_name = oid_map.get(value)
2377                 if oid_name is not None:
2378                     cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2379                     break
2380         if pp.asn1_type_name == Integer.asn1_type_name:
2381             hex_repr = hex(int(pp.obj._value))[2:].upper()
2382             if len(hex_repr) % 2 != 0:
2383                 hex_repr = "0" + hex_repr
2384             cols.append(_colourize(
2385                 "(%s)" % colonize_hex(hex_repr),
2386                 "green",
2387                 with_colours,
2388             ))
2389     if with_blob:
2390         if pp.blob.__class__ == binary_type:
2391             cols.append(hexenc(pp.blob))
2392         elif pp.blob.__class__ == tuple:
2393             cols.append(", ".join(pp.blob))
2394     if pp.optional:
2395         cols.append(_colourize("OPTIONAL", "red", with_colours))
2396     if pp.default:
2397         cols.append(_colourize("DEFAULT", "red", with_colours))
2398     if with_decode_path:
2399         cols.append(_colourize(
2400             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2401             "grey",
2402             with_colours,
2403         ))
2404     return " ".join(cols)
2405
2406
2407 def pp_console_blob(pp, decode_path_len_decrease=0):
2408     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2409     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2410     if decode_path_len > 0:
2411         cols.append(" ." * (decode_path_len + 1))
2412     if pp.blob.__class__ == binary_type:
2413         blob = hexenc(pp.blob).upper()
2414         for i in six_xrange(0, len(blob), 32):
2415             chunk = blob[i:i + 32]
2416             yield " ".join(cols + [colonize_hex(chunk)])
2417     elif pp.blob.__class__ == tuple:
2418         yield " ".join(cols + [", ".join(pp.blob)])
2419
2420
2421 def pprint(
2422         obj,
2423         oid_maps=(),
2424         big_blobs=False,
2425         with_colours=False,
2426         with_decode_path=False,
2427         decode_path_only=(),
2428         decode_path=(),
2429 ):
2430     """Pretty print object
2431
2432     :param Obj obj: object you want to pretty print
2433     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2434                      Its human readable form is printed when OID is met
2435     :param big_blobs: if large binary objects are met (like OctetString
2436                       values), do we need to print them too, on separate
2437                       lines
2438     :param with_colours: colourize output, if ``termcolor`` library
2439                          is available
2440     :param with_decode_path: print decode path
2441     :param decode_path_only: print only that specified decode path
2442     """
2443     def _pprint_pps(pps):
2444         for pp in pps:
2445             if hasattr(pp, "_fields"):
2446                 if (
2447                         decode_path_only != () and
2448                         tuple(
2449                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2450                         ) != decode_path_only
2451                 ):
2452                     continue
2453                 if big_blobs:
2454                     yield pp_console_row(
2455                         pp,
2456                         oid_maps=oid_maps,
2457                         with_offsets=True,
2458                         with_blob=False,
2459                         with_colours=with_colours,
2460                         with_decode_path=with_decode_path,
2461                         decode_path_len_decrease=len(decode_path_only),
2462                     )
2463                     for row in pp_console_blob(
2464                             pp,
2465                             decode_path_len_decrease=len(decode_path_only),
2466                     ):
2467                         yield row
2468                 else:
2469                     yield pp_console_row(
2470                         pp,
2471                         oid_maps=oid_maps,
2472                         with_offsets=True,
2473                         with_blob=True,
2474                         with_colours=with_colours,
2475                         with_decode_path=with_decode_path,
2476                         decode_path_len_decrease=len(decode_path_only),
2477                     )
2478             else:
2479                 for row in _pprint_pps(pp):
2480                     yield row
2481     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2482
2483
2484 ########################################################################
2485 # ASN.1 primitive types
2486 ########################################################################
2487
2488 BooleanState = namedtuple(
2489     "BooleanState",
2490     BasicState._fields + ("value",),
2491     **NAMEDTUPLE_KWARGS
2492 )
2493
2494
2495 class Boolean(Obj):
2496     """``BOOLEAN`` boolean type
2497
2498     >>> b = Boolean(True)
2499     BOOLEAN True
2500     >>> b == Boolean(True)
2501     True
2502     >>> bool(b)
2503     True
2504     """
2505     __slots__ = ()
2506     tag_default = tag_encode(1)
2507     asn1_type_name = "BOOLEAN"
2508
2509     def __init__(
2510             self,
2511             value=None,
2512             impl=None,
2513             expl=None,
2514             default=None,
2515             optional=False,
2516             _decoded=(0, 0, 0),
2517     ):
2518         """
2519         :param value: set the value. Either boolean type, or
2520                       :py:class:`pyderasn.Boolean` object
2521         :param bytes impl: override default tag with ``IMPLICIT`` one
2522         :param bytes expl: override default tag with ``EXPLICIT`` one
2523         :param default: set default value. Type same as in ``value``
2524         :param bool optional: is object ``OPTIONAL`` in sequence
2525         """
2526         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2527         self._value = None if value is None else self._value_sanitize(value)
2528         if default is not None:
2529             default = self._value_sanitize(default)
2530             self.default = self.__class__(
2531                 value=default,
2532                 impl=self.tag,
2533                 expl=self._expl,
2534             )
2535             if value is None:
2536                 self._value = default
2537
2538     def _value_sanitize(self, value):
2539         if value.__class__ == bool:
2540             return value
2541         if issubclass(value.__class__, Boolean):
2542             return value._value
2543         raise InvalidValueType((self.__class__, bool))
2544
2545     @property
2546     def ready(self):
2547         return self._value is not None
2548
2549     def __getstate__(self):
2550         return BooleanState(
2551             __version__,
2552             self.tag,
2553             self._tag_order,
2554             self._expl,
2555             self.default,
2556             self.optional,
2557             self.offset,
2558             self.llen,
2559             self.vlen,
2560             self.expl_lenindef,
2561             self.lenindef,
2562             self.ber_encoded,
2563             self._value,
2564         )
2565
2566     def __setstate__(self, state):
2567         super(Boolean, self).__setstate__(state)
2568         self._value = state.value
2569
2570     def __nonzero__(self):
2571         self._assert_ready()
2572         return self._value
2573
2574     def __bool__(self):
2575         self._assert_ready()
2576         return self._value
2577
2578     def __eq__(self, their):
2579         if their.__class__ == bool:
2580             return self._value == their
2581         if not issubclass(their.__class__, Boolean):
2582             return False
2583         return (
2584             self._value == their._value and
2585             self.tag == their.tag and
2586             self._expl == their._expl
2587         )
2588
2589     def __call__(
2590             self,
2591             value=None,
2592             impl=None,
2593             expl=None,
2594             default=None,
2595             optional=None,
2596     ):
2597         return self.__class__(
2598             value=value,
2599             impl=self.tag if impl is None else impl,
2600             expl=self._expl if expl is None else expl,
2601             default=self.default if default is None else default,
2602             optional=self.optional if optional is None else optional,
2603         )
2604
2605     def _encode(self):
2606         self._assert_ready()
2607         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2608
2609     def _encode1st(self, state):
2610         return len(self.tag) + 2, state
2611
2612     def _encode2nd(self, writer, state_iter):
2613         self._assert_ready()
2614         write_full(writer, self._encode())
2615
2616     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2617         try:
2618             t, _, lv = tag_strip(tlv)
2619         except DecodeError as err:
2620             raise err.__class__(
2621                 msg=err.msg,
2622                 klass=self.__class__,
2623                 decode_path=decode_path,
2624                 offset=offset,
2625             )
2626         if t != self.tag:
2627             raise TagMismatch(
2628                 klass=self.__class__,
2629                 decode_path=decode_path,
2630                 offset=offset,
2631             )
2632         if tag_only:
2633             yield None
2634             return
2635         try:
2636             l, _, v = len_decode(lv)
2637         except DecodeError as err:
2638             raise err.__class__(
2639                 msg=err.msg,
2640                 klass=self.__class__,
2641                 decode_path=decode_path,
2642                 offset=offset,
2643             )
2644         if l != 1:
2645             raise InvalidLength(
2646                 "Boolean's length must be equal to 1",
2647                 klass=self.__class__,
2648                 decode_path=decode_path,
2649                 offset=offset,
2650             )
2651         if l > len(v):
2652             raise NotEnoughData(
2653                 "encoded length is longer than data",
2654                 klass=self.__class__,
2655                 decode_path=decode_path,
2656                 offset=offset,
2657             )
2658         first_octet = byte2int(v)
2659         ber_encoded = False
2660         if first_octet == 0:
2661             value = False
2662         elif first_octet == 0xFF:
2663             value = True
2664         elif ctx.get("bered", False):
2665             value = True
2666             ber_encoded = True
2667         else:
2668             raise DecodeError(
2669                 "unacceptable Boolean value",
2670                 klass=self.__class__,
2671                 decode_path=decode_path,
2672                 offset=offset,
2673             )
2674         obj = self.__class__(
2675             value=value,
2676             impl=self.tag,
2677             expl=self._expl,
2678             default=self.default,
2679             optional=self.optional,
2680             _decoded=(offset, 1, 1),
2681         )
2682         obj.ber_encoded = ber_encoded
2683         yield decode_path, obj, v[1:]
2684
2685     def __repr__(self):
2686         return pp_console_row(next(self.pps()))
2687
2688     def pps(self, decode_path=()):
2689         yield _pp(
2690             obj=self,
2691             asn1_type_name=self.asn1_type_name,
2692             obj_name=self.__class__.__name__,
2693             decode_path=decode_path,
2694             value=str(self._value) if self.ready else None,
2695             optional=self.optional,
2696             default=self == self.default,
2697             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2698             expl=None if self._expl is None else tag_decode(self._expl),
2699             offset=self.offset,
2700             tlen=self.tlen,
2701             llen=self.llen,
2702             vlen=self.vlen,
2703             expl_offset=self.expl_offset if self.expled else None,
2704             expl_tlen=self.expl_tlen if self.expled else None,
2705             expl_llen=self.expl_llen if self.expled else None,
2706             expl_vlen=self.expl_vlen if self.expled else None,
2707             expl_lenindef=self.expl_lenindef,
2708             ber_encoded=self.ber_encoded,
2709             bered=self.bered,
2710         )
2711         for pp in self.pps_lenindef(decode_path):
2712             yield pp
2713
2714
2715 IntegerState = namedtuple(
2716     "IntegerState",
2717     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2718     **NAMEDTUPLE_KWARGS
2719 )
2720
2721
2722 class Integer(Obj):
2723     """``INTEGER`` integer type
2724
2725     >>> b = Integer(-123)
2726     INTEGER -123
2727     >>> b == Integer(-123)
2728     True
2729     >>> int(b)
2730     -123
2731
2732     >>> Integer(2, bounds=(1, 3))
2733     INTEGER 2
2734     >>> Integer(5, bounds=(1, 3))
2735     Traceback (most recent call last):
2736     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2737
2738     ::
2739
2740         class Version(Integer):
2741             schema = (
2742                 ("v1", 0),
2743                 ("v2", 1),
2744                 ("v3", 2),
2745             )
2746
2747     >>> v = Version("v1")
2748     Version INTEGER v1
2749     >>> int(v)
2750     0
2751     >>> v.named
2752     'v1'
2753     >>> v.specs
2754     {'v3': 2, 'v1': 0, 'v2': 1}
2755     """
2756     __slots__ = ("specs", "_bound_min", "_bound_max")
2757     tag_default = tag_encode(2)
2758     asn1_type_name = "INTEGER"
2759
2760     def __init__(
2761             self,
2762             value=None,
2763             bounds=None,
2764             impl=None,
2765             expl=None,
2766             default=None,
2767             optional=False,
2768             _specs=None,
2769             _decoded=(0, 0, 0),
2770     ):
2771         """
2772         :param value: set the value. Either integer type, named value
2773                       (if ``schema`` is specified in the class), or
2774                       :py:class:`pyderasn.Integer` object
2775         :param bounds: set ``(MIN, MAX)`` value constraint.
2776                        (-inf, +inf) by default
2777         :param bytes impl: override default tag with ``IMPLICIT`` one
2778         :param bytes expl: override default tag with ``EXPLICIT`` one
2779         :param default: set default value. Type same as in ``value``
2780         :param bool optional: is object ``OPTIONAL`` in sequence
2781         """
2782         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2783         self._value = value
2784         specs = getattr(self, "schema", {}) if _specs is None else _specs
2785         self.specs = specs if specs.__class__ == dict else dict(specs)
2786         self._bound_min, self._bound_max = getattr(
2787             self,
2788             "bounds",
2789             (float("-inf"), float("+inf")),
2790         ) if bounds is None else bounds
2791         if value is not None:
2792             self._value = self._value_sanitize(value)
2793         if default is not None:
2794             default = self._value_sanitize(default)
2795             self.default = self.__class__(
2796                 value=default,
2797                 impl=self.tag,
2798                 expl=self._expl,
2799                 _specs=self.specs,
2800             )
2801             if self._value is None:
2802                 self._value = default
2803
2804     def _value_sanitize(self, value):
2805         if isinstance(value, integer_types):
2806             pass
2807         elif issubclass(value.__class__, Integer):
2808             value = value._value
2809         elif value.__class__ == str:
2810             value = self.specs.get(value)
2811             if value is None:
2812                 raise ObjUnknown("integer value: %s" % value)
2813         else:
2814             raise InvalidValueType((self.__class__, int, str))
2815         if not self._bound_min <= value <= self._bound_max:
2816             raise BoundsError(self._bound_min, value, self._bound_max)
2817         return value
2818
2819     @property
2820     def ready(self):
2821         return self._value is not None
2822
2823     def __getstate__(self):
2824         return IntegerState(
2825             __version__,
2826             self.tag,
2827             self._tag_order,
2828             self._expl,
2829             self.default,
2830             self.optional,
2831             self.offset,
2832             self.llen,
2833             self.vlen,
2834             self.expl_lenindef,
2835             self.lenindef,
2836             self.ber_encoded,
2837             self.specs,
2838             self._value,
2839             self._bound_min,
2840             self._bound_max,
2841         )
2842
2843     def __setstate__(self, state):
2844         super(Integer, self).__setstate__(state)
2845         self.specs = state.specs
2846         self._value = state.value
2847         self._bound_min = state.bound_min
2848         self._bound_max = state.bound_max
2849
2850     def __int__(self):
2851         self._assert_ready()
2852         return int(self._value)
2853
2854     def __hash__(self):
2855         self._assert_ready()
2856         return hash(b"".join((
2857             self.tag,
2858             bytes(self._expl or b""),
2859             str(self._value).encode("ascii"),
2860         )))
2861
2862     def __eq__(self, their):
2863         if isinstance(their, integer_types):
2864             return self._value == their
2865         if not issubclass(their.__class__, Integer):
2866             return False
2867         return (
2868             self._value == their._value and
2869             self.tag == their.tag and
2870             self._expl == their._expl
2871         )
2872
2873     def __lt__(self, their):
2874         return self._value < their._value
2875
2876     @property
2877     def named(self):
2878         """Return named representation (if exists) of the value
2879         """
2880         for name, value in iteritems(self.specs):
2881             if value == self._value:
2882                 return name
2883         return None
2884
2885     def __call__(
2886             self,
2887             value=None,
2888             bounds=None,
2889             impl=None,
2890             expl=None,
2891             default=None,
2892             optional=None,
2893     ):
2894         return self.__class__(
2895             value=value,
2896             bounds=(
2897                 (self._bound_min, self._bound_max)
2898                 if bounds is None else bounds
2899             ),
2900             impl=self.tag if impl is None else impl,
2901             expl=self._expl if expl is None else expl,
2902             default=self.default if default is None else default,
2903             optional=self.optional if optional is None else optional,
2904             _specs=self.specs,
2905         )
2906
2907     def _encode_payload(self):
2908         self._assert_ready()
2909         value = self._value
2910         if PY2:
2911             if value == 0:
2912                 octets = bytearray([0])
2913             elif value < 0:
2914                 value = -value
2915                 value -= 1
2916                 octets = bytearray()
2917                 while value > 0:
2918                     octets.append((value & 0xFF) ^ 0xFF)
2919                     value >>= 8
2920                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2921                     octets.append(0xFF)
2922             else:
2923                 octets = bytearray()
2924                 while value > 0:
2925                     octets.append(value & 0xFF)
2926                     value >>= 8
2927                 if octets[-1] & 0x80 > 0:
2928                     octets.append(0x00)
2929             octets.reverse()
2930             octets = bytes(octets)
2931         else:
2932             bytes_len = ceil(value.bit_length() / 8) or 1
2933             while True:
2934                 try:
2935                     octets = value.to_bytes(
2936                         bytes_len,
2937                         byteorder="big",
2938                         signed=True,
2939                     )
2940                 except OverflowError:
2941                     bytes_len += 1
2942                 else:
2943                     break
2944         return octets
2945         return b"".join((self.tag, len_encode(len(octets)), octets))
2946
2947     def _encode(self):
2948         octets = self._encode_payload()
2949         return b"".join((self.tag, len_encode(len(octets)), octets))
2950
2951     def _encode1st(self, state):
2952         l = len(self._encode_payload())
2953         return len(self.tag) + len_size(l) + l, state
2954
2955     def _encode2nd(self, writer, state_iter):
2956         write_full(writer, self._encode())
2957
2958     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2959         try:
2960             t, _, lv = tag_strip(tlv)
2961         except DecodeError as err:
2962             raise err.__class__(
2963                 msg=err.msg,
2964                 klass=self.__class__,
2965                 decode_path=decode_path,
2966                 offset=offset,
2967             )
2968         if t != self.tag:
2969             raise TagMismatch(
2970                 klass=self.__class__,
2971                 decode_path=decode_path,
2972                 offset=offset,
2973             )
2974         if tag_only:
2975             yield None
2976             return
2977         try:
2978             l, llen, v = len_decode(lv)
2979         except DecodeError as err:
2980             raise err.__class__(
2981                 msg=err.msg,
2982                 klass=self.__class__,
2983                 decode_path=decode_path,
2984                 offset=offset,
2985             )
2986         if l > len(v):
2987             raise NotEnoughData(
2988                 "encoded length is longer than data",
2989                 klass=self.__class__,
2990                 decode_path=decode_path,
2991                 offset=offset,
2992             )
2993         if l == 0:
2994             raise NotEnoughData(
2995                 "zero length",
2996                 klass=self.__class__,
2997                 decode_path=decode_path,
2998                 offset=offset,
2999             )
3000         v, tail = v[:l], v[l:]
3001         first_octet = byte2int(v)
3002         if l > 1:
3003             second_octet = byte2int(v[1:])
3004             if (
3005                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3006                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3007             ):
3008                 raise DecodeError(
3009                     "non normalized integer",
3010                     klass=self.__class__,
3011                     decode_path=decode_path,
3012                     offset=offset,
3013                 )
3014         if PY2:
3015             value = 0
3016             if first_octet & 0x80 > 0:
3017                 octets = bytearray()
3018                 for octet in bytearray(v):
3019                     octets.append(octet ^ 0xFF)
3020                 for octet in octets:
3021                     value = (value << 8) | octet
3022                 value += 1
3023                 value = -value
3024             else:
3025                 for octet in bytearray(v):
3026                     value = (value << 8) | octet
3027         else:
3028             value = int.from_bytes(v, byteorder="big", signed=True)
3029         try:
3030             obj = self.__class__(
3031                 value=value,
3032                 bounds=(self._bound_min, self._bound_max),
3033                 impl=self.tag,
3034                 expl=self._expl,
3035                 default=self.default,
3036                 optional=self.optional,
3037                 _specs=self.specs,
3038                 _decoded=(offset, llen, l),
3039             )
3040         except BoundsError as err:
3041             raise DecodeError(
3042                 msg=str(err),
3043                 klass=self.__class__,
3044                 decode_path=decode_path,
3045                 offset=offset,
3046             )
3047         yield decode_path, obj, tail
3048
3049     def __repr__(self):
3050         return pp_console_row(next(self.pps()))
3051
3052     def pps(self, decode_path=()):
3053         yield _pp(
3054             obj=self,
3055             asn1_type_name=self.asn1_type_name,
3056             obj_name=self.__class__.__name__,
3057             decode_path=decode_path,
3058             value=(self.named or str(self._value)) if self.ready else None,
3059             optional=self.optional,
3060             default=self == self.default,
3061             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3062             expl=None if self._expl is None else tag_decode(self._expl),
3063             offset=self.offset,
3064             tlen=self.tlen,
3065             llen=self.llen,
3066             vlen=self.vlen,
3067             expl_offset=self.expl_offset if self.expled else None,
3068             expl_tlen=self.expl_tlen if self.expled else None,
3069             expl_llen=self.expl_llen if self.expled else None,
3070             expl_vlen=self.expl_vlen if self.expled else None,
3071             expl_lenindef=self.expl_lenindef,
3072             bered=self.bered,
3073         )
3074         for pp in self.pps_lenindef(decode_path):
3075             yield pp
3076
3077
3078 BitStringState = namedtuple(
3079     "BitStringState",
3080     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3081     **NAMEDTUPLE_KWARGS
3082 )
3083
3084
3085 class BitString(Obj):
3086     """``BIT STRING`` bit string type
3087
3088     >>> BitString(b"hello world")
3089     BIT STRING 88 bits 68656c6c6f20776f726c64
3090     >>> bytes(b)
3091     b'hello world'
3092     >>> b == b"hello world"
3093     True
3094     >>> b.bit_len
3095     88
3096
3097     >>> BitString("'0A3B5F291CD'H")
3098     BIT STRING 44 bits 0a3b5f291cd0
3099     >>> b = BitString("'010110000000'B")
3100     BIT STRING 12 bits 5800
3101     >>> b.bit_len
3102     12
3103     >>> b[0], b[1], b[2], b[3]
3104     (False, True, False, True)
3105     >>> b[1000]
3106     False
3107     >>> [v for v in b]
3108     [False, True, False, True, True, False, False, False, False, False, False, False]
3109
3110     ::
3111
3112         class KeyUsage(BitString):
3113             schema = (
3114                 ("digitalSignature", 0),
3115                 ("nonRepudiation", 1),
3116                 ("keyEncipherment", 2),
3117             )
3118
3119     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3120     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3121     >>> b.named
3122     ['nonRepudiation', 'keyEncipherment']
3123     >>> b.specs
3124     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3125
3126     .. note::
3127
3128        Pay attention that BIT STRING can be encoded both in primitive
3129        and constructed forms. Decoder always checks constructed form tag
3130        additionally to specified primitive one. If BER decoding is
3131        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3132        of DER restrictions.
3133     """
3134     __slots__ = ("tag_constructed", "specs", "defined")
3135     tag_default = tag_encode(3)
3136     asn1_type_name = "BIT STRING"
3137
3138     def __init__(
3139             self,
3140             value=None,
3141             impl=None,
3142             expl=None,
3143             default=None,
3144             optional=False,
3145             _specs=None,
3146             _decoded=(0, 0, 0),
3147     ):
3148         """
3149         :param value: set the value. Either binary type, tuple of named
3150                       values (if ``schema`` is specified in the class),
3151                       string in ``'XXX...'B`` form, or
3152                       :py:class:`pyderasn.BitString` object
3153         :param bytes impl: override default tag with ``IMPLICIT`` one
3154         :param bytes expl: override default tag with ``EXPLICIT`` one
3155         :param default: set default value. Type same as in ``value``
3156         :param bool optional: is object ``OPTIONAL`` in sequence
3157         """
3158         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3159         specs = getattr(self, "schema", {}) if _specs is None else _specs
3160         self.specs = specs if specs.__class__ == dict else dict(specs)
3161         self._value = None if value is None else self._value_sanitize(value)
3162         if default is not None:
3163             default = self._value_sanitize(default)
3164             self.default = self.__class__(
3165                 value=default,
3166                 impl=self.tag,
3167                 expl=self._expl,
3168             )
3169             if value is None:
3170                 self._value = default
3171         self.defined = None
3172         tag_klass, _, tag_num = tag_decode(self.tag)
3173         self.tag_constructed = tag_encode(
3174             klass=tag_klass,
3175             form=TagFormConstructed,
3176             num=tag_num,
3177         )
3178
3179     def _bits2octets(self, bits):
3180         if len(self.specs) > 0:
3181             bits = bits.rstrip("0")
3182         bit_len = len(bits)
3183         bits += "0" * ((8 - (bit_len % 8)) % 8)
3184         octets = bytearray(len(bits) // 8)
3185         for i in six_xrange(len(octets)):
3186             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3187         return bit_len, bytes(octets)
3188
3189     def _value_sanitize(self, value):
3190         if isinstance(value, (string_types, binary_type)):
3191             if (
3192                     isinstance(value, string_types) and
3193                     value.startswith("'")
3194             ):
3195                 if value.endswith("'B"):
3196                     value = value[1:-2]
3197                     if not frozenset(value) <= SET01:
3198                         raise ValueError("B's coding contains unacceptable chars")
3199                     return self._bits2octets(value)
3200                 if value.endswith("'H"):
3201                     value = value[1:-2]
3202                     return (
3203                         len(value) * 4,
3204                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3205                     )
3206             if value.__class__ == binary_type:
3207                 return (len(value) * 8, value)
3208             raise InvalidValueType((self.__class__, string_types, binary_type))
3209         if value.__class__ == tuple:
3210             if (
3211                     len(value) == 2 and
3212                     isinstance(value[0], integer_types) and
3213                     value[1].__class__ == binary_type
3214             ):
3215                 return value
3216             bits = []
3217             for name in value:
3218                 bit = self.specs.get(name)
3219                 if bit is None:
3220                     raise ObjUnknown("BitString value: %s" % name)
3221                 bits.append(bit)
3222             if len(bits) == 0:
3223                 return self._bits2octets("")
3224             bits = frozenset(bits)
3225             return self._bits2octets("".join(
3226                 ("1" if bit in bits else "0")
3227                 for bit in six_xrange(max(bits) + 1)
3228             ))
3229         if issubclass(value.__class__, BitString):
3230             return value._value
3231         raise InvalidValueType((self.__class__, binary_type, string_types))
3232
3233     @property
3234     def ready(self):
3235         return self._value is not None
3236
3237     def __getstate__(self):
3238         return BitStringState(
3239             __version__,
3240             self.tag,
3241             self._tag_order,
3242             self._expl,
3243             self.default,
3244             self.optional,
3245             self.offset,
3246             self.llen,
3247             self.vlen,
3248             self.expl_lenindef,
3249             self.lenindef,
3250             self.ber_encoded,
3251             self.specs,
3252             self._value,
3253             self.tag_constructed,
3254             self.defined,
3255         )
3256
3257     def __setstate__(self, state):
3258         super(BitString, self).__setstate__(state)
3259         self.specs = state.specs
3260         self._value = state.value
3261         self.tag_constructed = state.tag_constructed
3262         self.defined = state.defined
3263
3264     def __iter__(self):
3265         self._assert_ready()
3266         for i in six_xrange(self._value[0]):
3267             yield self[i]
3268
3269     @property
3270     def bit_len(self):
3271         """Returns number of bits in the string
3272         """
3273         self._assert_ready()
3274         return self._value[0]
3275
3276     def __bytes__(self):
3277         self._assert_ready()
3278         return self._value[1]
3279
3280     def __eq__(self, their):
3281         if their.__class__ == bytes:
3282             return self._value[1] == their
3283         if not issubclass(their.__class__, BitString):
3284             return False
3285         return (
3286             self._value == their._value and
3287             self.tag == their.tag and
3288             self._expl == their._expl
3289         )
3290
3291     @property
3292     def named(self):
3293         """Named representation (if exists) of the bits
3294
3295         :returns: [str(name), ...]
3296         """
3297         return [name for name, bit in iteritems(self.specs) if self[bit]]
3298
3299     def __call__(
3300             self,
3301             value=None,
3302             impl=None,
3303             expl=None,
3304             default=None,
3305             optional=None,
3306     ):
3307         return self.__class__(
3308             value=value,
3309             impl=self.tag if impl is None else impl,
3310             expl=self._expl if expl is None else expl,
3311             default=self.default if default is None else default,
3312             optional=self.optional if optional is None else optional,
3313             _specs=self.specs,
3314         )
3315
3316     def __getitem__(self, key):
3317         if key.__class__ == int:
3318             bit_len, octets = self._value
3319             if key >= bit_len:
3320                 return False
3321             return (
3322                 byte2int(memoryview(octets)[key // 8:]) >>
3323                 (7 - (key % 8))
3324             ) & 1 == 1
3325         if isinstance(key, string_types):
3326             value = self.specs.get(key)
3327             if value is None:
3328                 raise ObjUnknown("BitString value: %s" % key)
3329             return self[value]
3330         raise InvalidValueType((int, str))
3331
3332     def _encode(self):
3333         self._assert_ready()
3334         bit_len, octets = self._value
3335         return b"".join((
3336             self.tag,
3337             len_encode(len(octets) + 1),
3338             int2byte((8 - bit_len % 8) % 8),
3339             octets,
3340         ))
3341
3342     def _encode1st(self, state):
3343         self._assert_ready()
3344         _, octets = self._value
3345         l = len(octets) + 1
3346         return len(self.tag) + len_size(l) + l, state
3347
3348     def _encode2nd(self, writer, state_iter):
3349         bit_len, octets = self._value
3350         write_full(writer, b"".join((
3351             self.tag,
3352             len_encode(len(octets) + 1),
3353             int2byte((8 - bit_len % 8) % 8),
3354         )))
3355         write_full(writer, octets)
3356
3357     def _encode_cer(self, writer):
3358         bit_len, octets = self._value
3359         if len(octets) + 1 <= 1000:
3360             write_full(writer, self._encode())
3361             return
3362         write_full(writer, self.tag_constructed)
3363         write_full(writer, LENINDEF)
3364         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3365             write_full(writer, b"".join((
3366                 BitString.tag_default,
3367                 LEN1K,
3368                 int2byte(0),
3369                 octets[offset:offset + 999],
3370             )))
3371         tail = octets[offset+999:]
3372         if len(tail) > 0:
3373             tail = int2byte((8 - bit_len % 8) % 8) + tail
3374             write_full(writer, b"".join((
3375                 BitString.tag_default,
3376                 len_encode(len(tail)),
3377                 tail,
3378             )))
3379         write_full(writer, EOC)
3380
3381     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3382         try:
3383             t, tlen, lv = tag_strip(tlv)
3384         except DecodeError as err:
3385             raise err.__class__(
3386                 msg=err.msg,
3387                 klass=self.__class__,
3388                 decode_path=decode_path,
3389                 offset=offset,
3390             )
3391         if t == self.tag:
3392             if tag_only:  # pragma: no cover
3393                 yield None
3394                 return
3395             try:
3396                 l, llen, v = len_decode(lv)
3397             except DecodeError as err:
3398                 raise err.__class__(
3399                     msg=err.msg,
3400                     klass=self.__class__,
3401                     decode_path=decode_path,
3402                     offset=offset,
3403                 )
3404             if l > len(v):
3405                 raise NotEnoughData(
3406                     "encoded length is longer than data",
3407                     klass=self.__class__,
3408                     decode_path=decode_path,
3409                     offset=offset,
3410                 )
3411             if l == 0:
3412                 raise NotEnoughData(
3413                     "zero length",
3414                     klass=self.__class__,
3415                     decode_path=decode_path,
3416                     offset=offset,
3417                 )
3418             pad_size = byte2int(v)
3419             if l == 1 and pad_size != 0:
3420                 raise DecodeError(
3421                     "invalid empty value",
3422                     klass=self.__class__,
3423                     decode_path=decode_path,
3424                     offset=offset,
3425                 )
3426             if pad_size > 7:
3427                 raise DecodeError(
3428                     "too big pad",
3429                     klass=self.__class__,
3430                     decode_path=decode_path,
3431                     offset=offset,
3432                 )
3433             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3434                 raise DecodeError(
3435                     "invalid pad",
3436                     klass=self.__class__,
3437                     decode_path=decode_path,
3438                     offset=offset,
3439                 )
3440             v, tail = v[:l], v[l:]
3441             bit_len = (len(v) - 1) * 8 - pad_size
3442             obj = self.__class__(
3443                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3444                 impl=self.tag,
3445                 expl=self._expl,
3446                 default=self.default,
3447                 optional=self.optional,
3448                 _specs=self.specs,
3449                 _decoded=(offset, llen, l),
3450             )
3451             if evgen_mode:
3452                 obj._value = (bit_len, None)
3453             yield decode_path, obj, tail
3454             return
3455         if t != self.tag_constructed:
3456             raise TagMismatch(
3457                 klass=self.__class__,
3458                 decode_path=decode_path,
3459                 offset=offset,
3460             )
3461         if not ctx.get("bered", False):
3462             raise DecodeError(
3463                 "unallowed BER constructed encoding",
3464                 klass=self.__class__,
3465                 decode_path=decode_path,
3466                 offset=offset,
3467             )
3468         if tag_only:  # pragma: no cover
3469             yield None
3470             return
3471         lenindef = False
3472         try:
3473             l, llen, v = len_decode(lv)
3474         except LenIndefForm:
3475             llen, l, v = 1, 0, lv[1:]
3476             lenindef = True
3477         except DecodeError as err:
3478             raise err.__class__(
3479                 msg=err.msg,
3480                 klass=self.__class__,
3481                 decode_path=decode_path,
3482                 offset=offset,
3483             )
3484         if l > len(v):
3485             raise NotEnoughData(
3486                 "encoded length is longer than data",
3487                 klass=self.__class__,
3488                 decode_path=decode_path,
3489                 offset=offset,
3490             )
3491         if not lenindef and l == 0:
3492             raise NotEnoughData(
3493                 "zero length",
3494                 klass=self.__class__,
3495                 decode_path=decode_path,
3496                 offset=offset,
3497             )
3498         chunks = []
3499         sub_offset = offset + tlen + llen
3500         vlen = 0
3501         while True:
3502             if lenindef:
3503                 if v[:EOC_LEN].tobytes() == EOC:
3504                     break
3505             else:
3506                 if vlen == l:
3507                     break
3508                 if vlen > l:
3509                     raise DecodeError(
3510                         "chunk out of bounds",
3511                         klass=self.__class__,
3512                         decode_path=decode_path + (str(len(chunks) - 1),),
3513                         offset=chunks[-1].offset,
3514                     )
3515             sub_decode_path = decode_path + (str(len(chunks)),)
3516             try:
3517                 if evgen_mode:
3518                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3519                             v,
3520                             offset=sub_offset,
3521                             decode_path=sub_decode_path,
3522                             leavemm=True,
3523                             ctx=ctx,
3524                             _ctx_immutable=False,
3525                     ):
3526                         yield _decode_path, chunk, v_tail
3527                 else:
3528                     _, chunk, v_tail = next(BitString().decode_evgen(
3529                         v,
3530                         offset=sub_offset,
3531                         decode_path=sub_decode_path,
3532                         leavemm=True,
3533                         ctx=ctx,
3534                         _ctx_immutable=False,
3535                         _evgen_mode=False,
3536                     ))
3537             except TagMismatch:
3538                 raise DecodeError(
3539                     "expected BitString encoded chunk",
3540                     klass=self.__class__,
3541                     decode_path=sub_decode_path,
3542                     offset=sub_offset,
3543                 )
3544             chunks.append(chunk)
3545             sub_offset += chunk.tlvlen
3546             vlen += chunk.tlvlen
3547             v = v_tail
3548         if len(chunks) == 0:
3549             raise DecodeError(
3550                 "no chunks",
3551                 klass=self.__class__,
3552                 decode_path=decode_path,
3553                 offset=offset,
3554             )
3555         values = []
3556         bit_len = 0
3557         for chunk_i, chunk in enumerate(chunks[:-1]):
3558             if chunk.bit_len % 8 != 0:
3559                 raise DecodeError(
3560                     "BitString chunk is not multiple of 8 bits",
3561                     klass=self.__class__,
3562                     decode_path=decode_path + (str(chunk_i),),
3563                     offset=chunk.offset,
3564                 )
3565             if not evgen_mode:
3566                 values.append(bytes(chunk))
3567             bit_len += chunk.bit_len
3568         chunk_last = chunks[-1]
3569         if not evgen_mode:
3570             values.append(bytes(chunk_last))
3571         bit_len += chunk_last.bit_len
3572         obj = self.__class__(
3573             value=None if evgen_mode else (bit_len, b"".join(values)),
3574             impl=self.tag,
3575             expl=self._expl,
3576             default=self.default,
3577             optional=self.optional,
3578             _specs=self.specs,
3579             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3580         )
3581         if evgen_mode:
3582             obj._value = (bit_len, None)
3583         obj.lenindef = lenindef
3584         obj.ber_encoded = True
3585         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3586
3587     def __repr__(self):
3588         return pp_console_row(next(self.pps()))
3589
3590     def pps(self, decode_path=()):
3591         value = None
3592         blob = None
3593         if self.ready:
3594             bit_len, blob = self._value
3595             value = "%d bits" % bit_len
3596             if len(self.specs) > 0 and blob is not None:
3597                 blob = tuple(self.named)
3598         yield _pp(
3599             obj=self,
3600             asn1_type_name=self.asn1_type_name,
3601             obj_name=self.__class__.__name__,
3602             decode_path=decode_path,
3603             value=value,
3604             blob=blob,
3605             optional=self.optional,
3606             default=self == self.default,
3607             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3608             expl=None if self._expl is None else tag_decode(self._expl),
3609             offset=self.offset,
3610             tlen=self.tlen,
3611             llen=self.llen,
3612             vlen=self.vlen,
3613             expl_offset=self.expl_offset if self.expled else None,
3614             expl_tlen=self.expl_tlen if self.expled else None,
3615             expl_llen=self.expl_llen if self.expled else None,
3616             expl_vlen=self.expl_vlen if self.expled else None,
3617             expl_lenindef=self.expl_lenindef,
3618             lenindef=self.lenindef,
3619             ber_encoded=self.ber_encoded,
3620             bered=self.bered,
3621         )
3622         defined_by, defined = self.defined or (None, None)
3623         if defined_by is not None:
3624             yield defined.pps(
3625                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3626             )
3627         for pp in self.pps_lenindef(decode_path):
3628             yield pp
3629
3630
3631 OctetStringState = namedtuple(
3632     "OctetStringState",
3633     BasicState._fields + (
3634         "value",
3635         "bound_min",
3636         "bound_max",
3637         "tag_constructed",
3638         "defined",
3639     ),
3640     **NAMEDTUPLE_KWARGS
3641 )
3642
3643
3644 class OctetString(Obj):
3645     """``OCTET STRING`` binary string type
3646
3647     >>> s = OctetString(b"hello world")
3648     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3649     >>> s == OctetString(b"hello world")
3650     True
3651     >>> bytes(s)
3652     b'hello world'
3653
3654     >>> OctetString(b"hello", bounds=(4, 4))
3655     Traceback (most recent call last):
3656     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3657     >>> OctetString(b"hell", bounds=(4, 4))
3658     OCTET STRING 4 bytes 68656c6c
3659
3660     Memoryviews can be used as a values. If memoryview is made on
3661     mmap-ed file, then it does not take storage inside OctetString
3662     itself. In CER encoding mode it will be streamed to the specified
3663     writer, copying 1 KB chunks.
3664     """
3665     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3666     tag_default = tag_encode(4)
3667     asn1_type_name = "OCTET STRING"
3668     evgen_mode_skip_value = True
3669
3670     def __init__(
3671             self,
3672             value=None,
3673             bounds=None,
3674             impl=None,
3675             expl=None,
3676             default=None,
3677             optional=False,
3678             _decoded=(0, 0, 0),
3679             ctx=None,
3680     ):
3681         """
3682         :param value: set the value. Either binary type, or
3683                       :py:class:`pyderasn.OctetString` object
3684         :param bounds: set ``(MIN, MAX)`` value size constraint.
3685                        (-inf, +inf) by default
3686         :param bytes impl: override default tag with ``IMPLICIT`` one
3687         :param bytes expl: override default tag with ``EXPLICIT`` one
3688         :param default: set default value. Type same as in ``value``
3689         :param bool optional: is object ``OPTIONAL`` in sequence
3690         """
3691         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3692         self._value = value
3693         self._bound_min, self._bound_max = getattr(
3694             self,
3695             "bounds",
3696             (0, float("+inf")),
3697         ) if bounds is None else bounds
3698         if value is not None:
3699             self._value = self._value_sanitize(value)
3700         if default is not None:
3701             default = self._value_sanitize(default)
3702             self.default = self.__class__(
3703                 value=default,
3704                 impl=self.tag,
3705                 expl=self._expl,
3706             )
3707             if self._value is None:
3708                 self._value = default
3709         self.defined = None
3710         tag_klass, _, tag_num = tag_decode(self.tag)
3711         self.tag_constructed = tag_encode(
3712             klass=tag_klass,
3713             form=TagFormConstructed,
3714             num=tag_num,
3715         )
3716
3717     def _value_sanitize(self, value):
3718         if value.__class__ == binary_type or value.__class__ == memoryview:
3719             pass
3720         elif issubclass(value.__class__, OctetString):
3721             value = value._value
3722         else:
3723             raise InvalidValueType((self.__class__, bytes, memoryview))
3724         if not self._bound_min <= len(value) <= self._bound_max:
3725             raise BoundsError(self._bound_min, len(value), self._bound_max)
3726         return value
3727
3728     @property
3729     def ready(self):
3730         return self._value is not None
3731
3732     def __getstate__(self):
3733         return OctetStringState(
3734             __version__,
3735             self.tag,
3736             self._tag_order,
3737             self._expl,
3738             self.default,
3739             self.optional,
3740             self.offset,
3741             self.llen,
3742             self.vlen,
3743             self.expl_lenindef,
3744             self.lenindef,
3745             self.ber_encoded,
3746             self._value,
3747             self._bound_min,
3748             self._bound_max,
3749             self.tag_constructed,
3750             self.defined,
3751         )
3752
3753     def __setstate__(self, state):
3754         super(OctetString, self).__setstate__(state)
3755         self._value = state.value
3756         self._bound_min = state.bound_min
3757         self._bound_max = state.bound_max
3758         self.tag_constructed = state.tag_constructed
3759         self.defined = state.defined
3760
3761     def __bytes__(self):
3762         self._assert_ready()
3763         return bytes(self._value)
3764
3765     def __eq__(self, their):
3766         if their.__class__ == binary_type:
3767             return self._value == their
3768         if not issubclass(their.__class__, OctetString):
3769             return False
3770         return (
3771             self._value == their._value and
3772             self.tag == their.tag and
3773             self._expl == their._expl
3774         )
3775
3776     def __lt__(self, their):
3777         return self._value < their._value
3778
3779     def __call__(
3780             self,
3781             value=None,
3782             bounds=None,
3783             impl=None,
3784             expl=None,
3785             default=None,
3786             optional=None,
3787     ):
3788         return self.__class__(
3789             value=value,
3790             bounds=(
3791                 (self._bound_min, self._bound_max)
3792                 if bounds is None else bounds
3793             ),
3794             impl=self.tag if impl is None else impl,
3795             expl=self._expl if expl is None else expl,
3796             default=self.default if default is None else default,
3797             optional=self.optional if optional is None else optional,
3798         )
3799
3800     def _encode(self):
3801         self._assert_ready()
3802         return b"".join((
3803             self.tag,
3804             len_encode(len(self._value)),
3805             self._value,
3806         ))
3807
3808     def _encode1st(self, state):
3809         self._assert_ready()
3810         l = len(self._value)
3811         return len(self.tag) + len_size(l) + l, state
3812
3813     def _encode2nd(self, writer, state_iter):
3814         value = self._value
3815         write_full(writer, self.tag + len_encode(len(value)))
3816         write_full(writer, value)
3817
3818     def _encode_cer(self, writer):
3819         octets = self._value
3820         if len(octets) <= 1000:
3821             write_full(writer, self._encode())
3822             return
3823         write_full(writer, self.tag_constructed)
3824         write_full(writer, LENINDEF)
3825         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3826             write_full(writer, b"".join((
3827                 OctetString.tag_default,
3828                 LEN1K,
3829                 octets[offset:offset + 1000],
3830             )))
3831         tail = octets[offset+1000:]
3832         if len(tail) > 0:
3833             write_full(writer, b"".join((
3834                 OctetString.tag_default,
3835                 len_encode(len(tail)),
3836                 tail,
3837             )))
3838         write_full(writer, EOC)
3839
3840     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3841         try:
3842             t, tlen, lv = tag_strip(tlv)
3843         except DecodeError as err:
3844             raise err.__class__(
3845                 msg=err.msg,
3846                 klass=self.__class__,
3847                 decode_path=decode_path,
3848                 offset=offset,
3849             )
3850         if t == self.tag:
3851             if tag_only:
3852                 yield None
3853                 return
3854             try:
3855                 l, llen, v = len_decode(lv)
3856             except DecodeError as err:
3857                 raise err.__class__(
3858                     msg=err.msg,
3859                     klass=self.__class__,
3860                     decode_path=decode_path,
3861                     offset=offset,
3862                 )
3863             if l > len(v):
3864                 raise NotEnoughData(
3865                     "encoded length is longer than data",
3866                     klass=self.__class__,
3867                     decode_path=decode_path,
3868                     offset=offset,
3869                 )
3870             v, tail = v[:l], v[l:]
3871             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3872                 raise DecodeError(
3873                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3874                     klass=self.__class__,
3875                     decode_path=decode_path,
3876                     offset=offset,
3877                 )
3878             try:
3879                 obj = self.__class__(
3880                     value=(
3881                         None if (evgen_mode and self.evgen_mode_skip_value)
3882                         else v.tobytes()
3883                     ),
3884                     bounds=(self._bound_min, self._bound_max),
3885                     impl=self.tag,
3886                     expl=self._expl,
3887                     default=self.default,
3888                     optional=self.optional,
3889                     _decoded=(offset, llen, l),
3890                     ctx=ctx,
3891                 )
3892             except DecodeError as err:
3893                 raise DecodeError(
3894                     msg=err.msg,
3895                     klass=self.__class__,
3896                     decode_path=decode_path,
3897                     offset=offset,
3898                 )
3899             except BoundsError as err:
3900                 raise DecodeError(
3901                     msg=str(err),
3902                     klass=self.__class__,
3903                     decode_path=decode_path,
3904                     offset=offset,
3905                 )
3906             yield decode_path, obj, tail
3907             return
3908         if t != self.tag_constructed:
3909             raise TagMismatch(
3910                 klass=self.__class__,
3911                 decode_path=decode_path,
3912                 offset=offset,
3913             )
3914         if not ctx.get("bered", False):
3915             raise DecodeError(
3916                 "unallowed BER constructed encoding",
3917                 klass=self.__class__,
3918                 decode_path=decode_path,
3919                 offset=offset,
3920             )
3921         if tag_only:
3922             yield None
3923             return
3924         lenindef = False
3925         try:
3926             l, llen, v = len_decode(lv)
3927         except LenIndefForm:
3928             llen, l, v = 1, 0, lv[1:]
3929             lenindef = True
3930         except DecodeError as err:
3931             raise err.__class__(
3932                 msg=err.msg,
3933                 klass=self.__class__,
3934                 decode_path=decode_path,
3935                 offset=offset,
3936             )
3937         if l > len(v):
3938             raise NotEnoughData(
3939                 "encoded length is longer than data",
3940                 klass=self.__class__,
3941                 decode_path=decode_path,
3942                 offset=offset,
3943             )
3944         chunks = []
3945         chunks_count = 0
3946         sub_offset = offset + tlen + llen
3947         vlen = 0
3948         payload_len = 0
3949         while True:
3950             if lenindef:
3951                 if v[:EOC_LEN].tobytes() == EOC:
3952                     break
3953             else:
3954                 if vlen == l:
3955                     break
3956                 if vlen > l:
3957                     raise DecodeError(
3958                         "chunk out of bounds",
3959                         klass=self.__class__,
3960                         decode_path=decode_path + (str(len(chunks) - 1),),
3961                         offset=chunks[-1].offset,
3962                     )
3963             try:
3964                 if evgen_mode:
3965                     sub_decode_path = decode_path + (str(chunks_count),)
3966                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3967                             v,
3968                             offset=sub_offset,
3969                             decode_path=sub_decode_path,
3970                             leavemm=True,
3971                             ctx=ctx,
3972                             _ctx_immutable=False,
3973                     ):
3974                         yield _decode_path, chunk, v_tail
3975                         if not chunk.ber_encoded:
3976                             payload_len += chunk.vlen
3977                     chunks_count += 1
3978                 else:
3979                     sub_decode_path = decode_path + (str(len(chunks)),)
3980                     _, chunk, v_tail = next(OctetString().decode_evgen(
3981                         v,
3982                         offset=sub_offset,
3983                         decode_path=sub_decode_path,
3984                         leavemm=True,
3985                         ctx=ctx,
3986                         _ctx_immutable=False,
3987                         _evgen_mode=False,
3988                     ))
3989                     chunks.append(chunk)
3990             except TagMismatch:
3991                 raise DecodeError(
3992                     "expected OctetString encoded chunk",
3993                     klass=self.__class__,
3994                     decode_path=sub_decode_path,
3995                     offset=sub_offset,
3996                 )
3997             sub_offset += chunk.tlvlen
3998             vlen += chunk.tlvlen
3999             v = v_tail
4000         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4001             raise DecodeError(
4002                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4003                 klass=self.__class__,
4004                 decode_path=decode_path,
4005                 offset=offset,
4006             )
4007         try:
4008             obj = self.__class__(
4009                 value=(
4010                     None if evgen_mode else
4011                     b"".join(bytes(chunk) for chunk in chunks)
4012                 ),
4013                 bounds=(self._bound_min, self._bound_max),
4014                 impl=self.tag,
4015                 expl=self._expl,
4016                 default=self.default,
4017                 optional=self.optional,
4018                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4019                 ctx=ctx,
4020             )
4021         except DecodeError as err:
4022             raise DecodeError(
4023                 msg=err.msg,
4024                 klass=self.__class__,
4025                 decode_path=decode_path,
4026                 offset=offset,
4027             )
4028         except BoundsError as err:
4029             raise DecodeError(
4030                 msg=str(err),
4031                 klass=self.__class__,
4032                 decode_path=decode_path,
4033                 offset=offset,
4034             )
4035         obj.lenindef = lenindef
4036         obj.ber_encoded = True
4037         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4038
4039     def __repr__(self):
4040         return pp_console_row(next(self.pps()))
4041
4042     def pps(self, decode_path=()):
4043         yield _pp(
4044             obj=self,
4045             asn1_type_name=self.asn1_type_name,
4046             obj_name=self.__class__.__name__,
4047             decode_path=decode_path,
4048             value=("%d bytes" % len(self._value)) if self.ready else None,
4049             blob=self._value if self.ready else None,
4050             optional=self.optional,
4051             default=self == self.default,
4052             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4053             expl=None if self._expl is None else tag_decode(self._expl),
4054             offset=self.offset,
4055             tlen=self.tlen,
4056             llen=self.llen,
4057             vlen=self.vlen,
4058             expl_offset=self.expl_offset if self.expled else None,
4059             expl_tlen=self.expl_tlen if self.expled else None,
4060             expl_llen=self.expl_llen if self.expled else None,
4061             expl_vlen=self.expl_vlen if self.expled else None,
4062             expl_lenindef=self.expl_lenindef,
4063             lenindef=self.lenindef,
4064             ber_encoded=self.ber_encoded,
4065             bered=self.bered,
4066         )
4067         defined_by, defined = self.defined or (None, None)
4068         if defined_by is not None:
4069             yield defined.pps(
4070                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4071             )
4072         for pp in self.pps_lenindef(decode_path):
4073             yield pp
4074
4075
4076 def agg_octet_string(evgens, decode_path, raw, writer):
4077     """Aggregate constructed string (OctetString and its derivatives)
4078
4079     :param evgens: iterator of generated events
4080     :param decode_path: points to the string we want to decode
4081     :param raw: slicebable (memoryview, bytearray, etc) with
4082                 the data evgens are generated on
4083     :param writer: buffer.write where string is going to be saved
4084     :param writer: where string is going to be saved. Must comply
4085                    with ``io.RawIOBase.write`` behaviour
4086
4087     .. seealso:: :ref:`agg_octet_string`
4088     """
4089     decode_path_len = len(decode_path)
4090     for dp, obj, _ in evgens:
4091         if dp[:decode_path_len] != decode_path:
4092             continue
4093         if not obj.ber_encoded:
4094             write_full(writer, raw[
4095                 obj.offset + obj.tlen + obj.llen:
4096                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4097                 (EOC_LEN if obj.expl_lenindef else 0)
4098             ])
4099         if len(dp) == decode_path_len:
4100             break
4101
4102
4103 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4104
4105
4106 class Null(Obj):
4107     """``NULL`` null object
4108
4109     >>> n = Null()
4110     NULL
4111     >>> n.ready
4112     True
4113     """
4114     __slots__ = ()
4115     tag_default = tag_encode(5)
4116     asn1_type_name = "NULL"
4117
4118     def __init__(
4119             self,
4120             value=None,  # unused, but Sequence passes it
4121             impl=None,
4122             expl=None,
4123             optional=False,
4124             _decoded=(0, 0, 0),
4125     ):
4126         """
4127         :param bytes impl: override default tag with ``IMPLICIT`` one
4128         :param bytes expl: override default tag with ``EXPLICIT`` one
4129         :param bool optional: is object ``OPTIONAL`` in sequence
4130         """
4131         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4132         self.default = None
4133
4134     @property
4135     def ready(self):
4136         return True
4137
4138     def __getstate__(self):
4139         return NullState(
4140             __version__,
4141             self.tag,
4142             self._tag_order,
4143             self._expl,
4144             self.default,
4145             self.optional,
4146             self.offset,
4147             self.llen,
4148             self.vlen,
4149             self.expl_lenindef,
4150             self.lenindef,
4151             self.ber_encoded,
4152         )
4153
4154     def __eq__(self, their):
4155         if not issubclass(their.__class__, Null):
4156             return False
4157         return (
4158             self.tag == their.tag and
4159             self._expl == their._expl
4160         )
4161
4162     def __call__(
4163             self,
4164             value=None,
4165             impl=None,
4166             expl=None,
4167             optional=None,
4168     ):
4169         return self.__class__(
4170             impl=self.tag if impl is None else impl,
4171             expl=self._expl if expl is None else expl,
4172             optional=self.optional if optional is None else optional,
4173         )
4174
4175     def _encode(self):
4176         return self.tag + LEN0
4177
4178     def _encode1st(self, state):
4179         return len(self.tag) + 1, state
4180
4181     def _encode2nd(self, writer, state_iter):
4182         write_full(writer, self.tag + LEN0)
4183
4184     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4185         try:
4186             t, _, lv = tag_strip(tlv)
4187         except DecodeError as err:
4188             raise err.__class__(
4189                 msg=err.msg,
4190                 klass=self.__class__,
4191                 decode_path=decode_path,
4192                 offset=offset,
4193             )
4194         if t != self.tag:
4195             raise TagMismatch(
4196                 klass=self.__class__,
4197                 decode_path=decode_path,
4198                 offset=offset,
4199             )
4200         if tag_only:  # pragma: no cover
4201             yield None
4202             return
4203         try:
4204             l, _, v = len_decode(lv)
4205         except DecodeError as err:
4206             raise err.__class__(
4207                 msg=err.msg,
4208                 klass=self.__class__,
4209                 decode_path=decode_path,
4210                 offset=offset,
4211             )
4212         if l != 0:
4213             raise InvalidLength(
4214                 "Null must have zero length",
4215                 klass=self.__class__,
4216                 decode_path=decode_path,
4217                 offset=offset,
4218             )
4219         obj = self.__class__(
4220             impl=self.tag,
4221             expl=self._expl,
4222             optional=self.optional,
4223             _decoded=(offset, 1, 0),
4224         )
4225         yield decode_path, obj, v
4226
4227     def __repr__(self):
4228         return pp_console_row(next(self.pps()))
4229
4230     def pps(self, decode_path=()):
4231         yield _pp(
4232             obj=self,
4233             asn1_type_name=self.asn1_type_name,
4234             obj_name=self.__class__.__name__,
4235             decode_path=decode_path,
4236             optional=self.optional,
4237             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4238             expl=None if self._expl is None else tag_decode(self._expl),
4239             offset=self.offset,
4240             tlen=self.tlen,
4241             llen=self.llen,
4242             vlen=self.vlen,
4243             expl_offset=self.expl_offset if self.expled else None,
4244             expl_tlen=self.expl_tlen if self.expled else None,
4245             expl_llen=self.expl_llen if self.expled else None,
4246             expl_vlen=self.expl_vlen if self.expled else None,
4247             expl_lenindef=self.expl_lenindef,
4248             bered=self.bered,
4249         )
4250         for pp in self.pps_lenindef(decode_path):
4251             yield pp
4252
4253
4254 ObjectIdentifierState = namedtuple(
4255     "ObjectIdentifierState",
4256     BasicState._fields + ("value", "defines"),
4257     **NAMEDTUPLE_KWARGS
4258 )
4259
4260
4261 class ObjectIdentifier(Obj):
4262     """``OBJECT IDENTIFIER`` OID type
4263
4264     >>> oid = ObjectIdentifier((1, 2, 3))
4265     OBJECT IDENTIFIER 1.2.3
4266     >>> oid == ObjectIdentifier("1.2.3")
4267     True
4268     >>> tuple(oid)
4269     (1, 2, 3)
4270     >>> str(oid)
4271     '1.2.3'
4272     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4273     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4274
4275     >>> str(ObjectIdentifier((3, 1)))
4276     Traceback (most recent call last):
4277     pyderasn.InvalidOID: unacceptable first arc value
4278     """
4279     __slots__ = ("defines",)
4280     tag_default = tag_encode(6)
4281     asn1_type_name = "OBJECT IDENTIFIER"
4282
4283     def __init__(
4284             self,
4285             value=None,
4286             defines=(),
4287             impl=None,
4288             expl=None,
4289             default=None,
4290             optional=False,
4291             _decoded=(0, 0, 0),
4292     ):
4293         """
4294         :param value: set the value. Either tuples of integers,
4295                       string of "."-concatenated integers, or
4296                       :py:class:`pyderasn.ObjectIdentifier` object
4297         :param defines: sequence of tuples. Each tuple has two elements.
4298                         First one is relative to current one decode
4299                         path, aiming to the field defined by that OID.
4300                         Read about relative path in
4301                         :py:func:`pyderasn.abs_decode_path`. Second
4302                         tuple element is ``{OID: pyderasn.Obj()}``
4303                         dictionary, mapping between current OID value
4304                         and structure applied to defined field.
4305
4306                         .. seealso:: :ref:`definedby`
4307
4308         :param bytes impl: override default tag with ``IMPLICIT`` one
4309         :param bytes expl: override default tag with ``EXPLICIT`` one
4310         :param default: set default value. Type same as in ``value``
4311         :param bool optional: is object ``OPTIONAL`` in sequence
4312         """
4313         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4314         self._value = value
4315         if value is not None:
4316             self._value = self._value_sanitize(value)
4317         if default is not None:
4318             default = self._value_sanitize(default)
4319             self.default = self.__class__(
4320                 value=default,
4321                 impl=self.tag,
4322                 expl=self._expl,
4323             )
4324             if self._value is None:
4325                 self._value = default
4326         self.defines = defines
4327
4328     def __add__(self, their):
4329         if their.__class__ == tuple:
4330             return self.__class__(self._value + array("L", their))
4331         if isinstance(their, self.__class__):
4332             return self.__class__(self._value + their._value)
4333         raise InvalidValueType((self.__class__, tuple))
4334
4335     def _value_sanitize(self, value):
4336         if issubclass(value.__class__, ObjectIdentifier):
4337             return value._value
4338         if isinstance(value, string_types):
4339             try:
4340                 value = array("L", (pureint(arc) for arc in value.split(".")))
4341             except ValueError:
4342                 raise InvalidOID("unacceptable arcs values")
4343         if value.__class__ == tuple:
4344             try:
4345                 value = array("L", value)
4346             except OverflowError as err:
4347                 raise InvalidOID(repr(err))
4348         if value.__class__ is array:
4349             if len(value) < 2:
4350                 raise InvalidOID("less than 2 arcs")
4351             first_arc = value[0]
4352             if first_arc in (0, 1):
4353                 if not (0 <= value[1] <= 39):
4354                     raise InvalidOID("second arc is too wide")
4355             elif first_arc == 2:
4356                 pass
4357             else:
4358                 raise InvalidOID("unacceptable first arc value")
4359             if not all(arc >= 0 for arc in value):
4360                 raise InvalidOID("negative arc value")
4361             return value
4362         raise InvalidValueType((self.__class__, str, tuple))
4363
4364     @property
4365     def ready(self):
4366         return self._value is not None
4367
4368     def __getstate__(self):
4369         return ObjectIdentifierState(
4370             __version__,
4371             self.tag,
4372             self._tag_order,
4373             self._expl,
4374             self.default,
4375             self.optional,
4376             self.offset,
4377             self.llen,
4378             self.vlen,
4379             self.expl_lenindef,
4380             self.lenindef,
4381             self.ber_encoded,
4382             self._value,
4383             self.defines,
4384         )
4385
4386     def __setstate__(self, state):
4387         super(ObjectIdentifier, self).__setstate__(state)
4388         self._value = state.value
4389         self.defines = state.defines
4390
4391     def __iter__(self):
4392         self._assert_ready()
4393         return iter(self._value)
4394
4395     def __str__(self):
4396         return ".".join(str(arc) for arc in self._value or ())
4397
4398     def __hash__(self):
4399         self._assert_ready()
4400         return hash(b"".join((
4401             self.tag,
4402             bytes(self._expl or b""),
4403             str(self._value).encode("ascii"),
4404         )))
4405
4406     def __eq__(self, their):
4407         if their.__class__ == tuple:
4408             return self._value == array("L", their)
4409         if not issubclass(their.__class__, ObjectIdentifier):
4410             return False
4411         return (
4412             self.tag == their.tag and
4413             self._expl == their._expl and
4414             self._value == their._value
4415         )
4416
4417     def __lt__(self, their):
4418         return self._value < their._value
4419
4420     def __call__(
4421             self,
4422             value=None,
4423             defines=None,
4424             impl=None,
4425             expl=None,
4426             default=None,
4427             optional=None,
4428     ):
4429         return self.__class__(
4430             value=value,
4431             defines=self.defines if defines is None else defines,
4432             impl=self.tag if impl is None else impl,
4433             expl=self._expl if expl is None else expl,
4434             default=self.default if default is None else default,
4435             optional=self.optional if optional is None else optional,
4436         )
4437
4438     def _encode_octets(self):
4439         self._assert_ready()
4440         value = self._value
4441         first_value = value[1]
4442         first_arc = value[0]
4443         if first_arc == 0:
4444             pass
4445         elif first_arc == 1:
4446             first_value += 40
4447         elif first_arc == 2:
4448             first_value += 80
4449         else:  # pragma: no cover
4450             raise RuntimeError("invalid arc is stored")
4451         octets = [zero_ended_encode(first_value)]
4452         for arc in value[2:]:
4453             octets.append(zero_ended_encode(arc))
4454         return b"".join(octets)
4455
4456     def _encode(self):
4457         v = self._encode_octets()
4458         return b"".join((self.tag, len_encode(len(v)), v))
4459
4460     def _encode1st(self, state):
4461         l = len(self._encode_octets())
4462         return len(self.tag) + len_size(l) + l, state
4463
4464     def _encode2nd(self, writer, state_iter):
4465         write_full(writer, self._encode())
4466
4467     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4468         try:
4469             t, _, lv = tag_strip(tlv)
4470         except DecodeError as err:
4471             raise err.__class__(
4472                 msg=err.msg,
4473                 klass=self.__class__,
4474                 decode_path=decode_path,
4475                 offset=offset,
4476             )
4477         if t != self.tag:
4478             raise TagMismatch(
4479                 klass=self.__class__,
4480                 decode_path=decode_path,
4481                 offset=offset,
4482             )
4483         if tag_only:  # pragma: no cover
4484             yield None
4485             return
4486         try:
4487             l, llen, v = len_decode(lv)
4488         except DecodeError as err:
4489             raise err.__class__(
4490                 msg=err.msg,
4491                 klass=self.__class__,
4492                 decode_path=decode_path,
4493                 offset=offset,
4494             )
4495         if l > len(v):
4496             raise NotEnoughData(
4497                 "encoded length is longer than data",
4498                 klass=self.__class__,
4499                 decode_path=decode_path,
4500                 offset=offset,
4501             )
4502         if l == 0:
4503             raise NotEnoughData(
4504                 "zero length",
4505                 klass=self.__class__,
4506                 decode_path=decode_path,
4507                 offset=offset,
4508             )
4509         v, tail = v[:l], v[l:]
4510         arcs = array("L")
4511         ber_encoded = False
4512         while len(v) > 0:
4513             i = 0
4514             arc = 0
4515             while True:
4516                 octet = indexbytes(v, i)
4517                 if i == 0 and octet == 0x80:
4518                     if ctx.get("bered", False):
4519                         ber_encoded = True
4520                     else:
4521                         raise DecodeError(
4522                             "non normalized arc encoding",
4523                             klass=self.__class__,
4524                             decode_path=decode_path,
4525                             offset=offset,
4526                         )
4527                 arc = (arc << 7) | (octet & 0x7F)
4528                 if octet & 0x80 == 0:
4529                     try:
4530                         arcs.append(arc)
4531                     except OverflowError:
4532                         raise DecodeError(
4533                             "too huge value for local unsigned long",
4534                             klass=self.__class__,
4535                             decode_path=decode_path,
4536                             offset=offset,
4537                         )
4538                     v = v[i + 1:]
4539                     break
4540                 i += 1
4541                 if i == len(v):
4542                     raise DecodeError(
4543                         "unfinished OID",
4544                         klass=self.__class__,
4545                         decode_path=decode_path,
4546                         offset=offset,
4547                     )
4548         first_arc = 0
4549         second_arc = arcs[0]
4550         if 0 <= second_arc <= 39:
4551             first_arc = 0
4552         elif 40 <= second_arc <= 79:
4553             first_arc = 1
4554             second_arc -= 40
4555         else:
4556             first_arc = 2
4557             second_arc -= 80
4558         obj = self.__class__(
4559             value=array("L", (first_arc, second_arc)) + arcs[1:],
4560             impl=self.tag,
4561             expl=self._expl,
4562             default=self.default,
4563             optional=self.optional,
4564             _decoded=(offset, llen, l),
4565         )
4566         if ber_encoded:
4567             obj.ber_encoded = True
4568         yield decode_path, obj, tail
4569
4570     def __repr__(self):
4571         return pp_console_row(next(self.pps()))
4572
4573     def pps(self, decode_path=()):
4574         yield _pp(
4575             obj=self,
4576             asn1_type_name=self.asn1_type_name,
4577             obj_name=self.__class__.__name__,
4578             decode_path=decode_path,
4579             value=str(self) if self.ready else None,
4580             optional=self.optional,
4581             default=self == self.default,
4582             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4583             expl=None if self._expl is None else tag_decode(self._expl),
4584             offset=self.offset,
4585             tlen=self.tlen,
4586             llen=self.llen,
4587             vlen=self.vlen,
4588             expl_offset=self.expl_offset if self.expled else None,
4589             expl_tlen=self.expl_tlen if self.expled else None,
4590             expl_llen=self.expl_llen if self.expled else None,
4591             expl_vlen=self.expl_vlen if self.expled else None,
4592             expl_lenindef=self.expl_lenindef,
4593             ber_encoded=self.ber_encoded,
4594             bered=self.bered,
4595         )
4596         for pp in self.pps_lenindef(decode_path):
4597             yield pp
4598
4599
4600 class Enumerated(Integer):
4601     """``ENUMERATED`` integer type
4602
4603     This type is identical to :py:class:`pyderasn.Integer`, but requires
4604     schema to be specified and does not accept values missing from it.
4605     """
4606     __slots__ = ()
4607     tag_default = tag_encode(10)
4608     asn1_type_name = "ENUMERATED"
4609
4610     def __init__(
4611             self,
4612             value=None,
4613             impl=None,
4614             expl=None,
4615             default=None,
4616             optional=False,
4617             _specs=None,
4618             _decoded=(0, 0, 0),
4619             bounds=None,  # dummy argument, workability for Integer.decode
4620     ):
4621         super(Enumerated, self).__init__(
4622             value, bounds, impl, expl, default, optional, _specs, _decoded,
4623         )
4624         if len(self.specs) == 0:
4625             raise ValueError("schema must be specified")
4626
4627     def _value_sanitize(self, value):
4628         if isinstance(value, self.__class__):
4629             value = value._value
4630         elif isinstance(value, integer_types):
4631             for _value in itervalues(self.specs):
4632                 if _value == value:
4633                     break
4634             else:
4635                 raise DecodeError(
4636                     "unknown integer value: %s" % value,
4637                     klass=self.__class__,
4638                 )
4639         elif isinstance(value, string_types):
4640             value = self.specs.get(value)
4641             if value is None:
4642                 raise ObjUnknown("integer value: %s" % value)
4643         else:
4644             raise InvalidValueType((self.__class__, int, str))
4645         return value
4646
4647     def __call__(
4648             self,
4649             value=None,
4650             impl=None,
4651             expl=None,
4652             default=None,
4653             optional=None,
4654             _specs=None,
4655     ):
4656         return self.__class__(
4657             value=value,
4658             impl=self.tag if impl is None else impl,
4659             expl=self._expl if expl is None else expl,
4660             default=self.default if default is None else default,
4661             optional=self.optional if optional is None else optional,
4662             _specs=self.specs,
4663         )
4664
4665
4666 def escape_control_unicode(c):
4667     if unicat(c)[0] == "C":
4668         c = repr(c).lstrip("u").strip("'")
4669     return c
4670
4671
4672 class CommonString(OctetString):
4673     """Common class for all strings
4674
4675     Everything resembles :py:class:`pyderasn.OctetString`, except
4676     ability to deal with unicode text strings.
4677
4678     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4679     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4680     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4681     True
4682     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4683     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4684     >>> str(s)
4685     'привет Ð¼Ð¸Ñ€'
4686     >>> hexenc(bytes(s))
4687     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4688
4689     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4690     Traceback (most recent call last):
4691     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4692
4693     >>> BMPString("ада", bounds=(2, 2))
4694     Traceback (most recent call last):
4695     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4696     >>> s = BMPString("ад", bounds=(2, 2))
4697     >>> s.encoding
4698     'utf-16-be'
4699     >>> hexenc(bytes(s))
4700     '04300434'
4701
4702     .. list-table::
4703        :header-rows: 1
4704
4705        * - Class
4706          - Text Encoding
4707        * - :py:class:`pyderasn.UTF8String`
4708          - utf-8
4709        * - :py:class:`pyderasn.NumericString`
4710          - ascii
4711        * - :py:class:`pyderasn.PrintableString`
4712          - ascii
4713        * - :py:class:`pyderasn.TeletexString`
4714          - ascii
4715        * - :py:class:`pyderasn.T61String`
4716          - ascii
4717        * - :py:class:`pyderasn.VideotexString`
4718          - iso-8859-1
4719        * - :py:class:`pyderasn.IA5String`
4720          - ascii
4721        * - :py:class:`pyderasn.GraphicString`
4722          - iso-8859-1
4723        * - :py:class:`pyderasn.VisibleString`
4724          - ascii
4725        * - :py:class:`pyderasn.ISO646String`
4726          - ascii
4727        * - :py:class:`pyderasn.GeneralString`
4728          - iso-8859-1
4729        * - :py:class:`pyderasn.UniversalString`
4730          - utf-32-be
4731        * - :py:class:`pyderasn.BMPString`
4732          - utf-16-be
4733     """
4734     __slots__ = ()
4735
4736     def _value_sanitize(self, value):
4737         value_raw = None
4738         value_decoded = None
4739         if isinstance(value, self.__class__):
4740             value_raw = value._value
4741         elif value.__class__ == text_type:
4742             value_decoded = value
4743         elif value.__class__ == binary_type:
4744             value_raw = value
4745         else:
4746             raise InvalidValueType((self.__class__, text_type, binary_type))
4747         try:
4748             value_raw = (
4749                 value_decoded.encode(self.encoding)
4750                 if value_raw is None else value_raw
4751             )
4752             value_decoded = (
4753                 value_raw.decode(self.encoding)
4754                 if value_decoded is None else value_decoded
4755             )
4756         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4757             raise DecodeError(str(err))
4758         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4759             raise BoundsError(
4760                 self._bound_min,
4761                 len(value_decoded),
4762                 self._bound_max,
4763             )
4764         return value_raw
4765
4766     def __eq__(self, their):
4767         if their.__class__ == binary_type:
4768             return self._value == their
4769         if their.__class__ == text_type:
4770             return self._value == their.encode(self.encoding)
4771         if not isinstance(their, self.__class__):
4772             return False
4773         return (
4774             self._value == their._value and
4775             self.tag == their.tag and
4776             self._expl == their._expl
4777         )
4778
4779     def __unicode__(self):
4780         if self.ready:
4781             return self._value.decode(self.encoding)
4782         return text_type(self._value)
4783
4784     def __repr__(self):
4785         return pp_console_row(next(self.pps(no_unicode=PY2)))
4786
4787     def pps(self, decode_path=(), no_unicode=False):
4788         value = None
4789         if self.ready:
4790             value = (
4791                 hexenc(bytes(self)) if no_unicode else
4792                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4793             )
4794         yield _pp(
4795             obj=self,
4796             asn1_type_name=self.asn1_type_name,
4797             obj_name=self.__class__.__name__,
4798             decode_path=decode_path,
4799             value=value,
4800             optional=self.optional,
4801             default=self == self.default,
4802             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4803             expl=None if self._expl is None else tag_decode(self._expl),
4804             offset=self.offset,
4805             tlen=self.tlen,
4806             llen=self.llen,
4807             vlen=self.vlen,
4808             expl_offset=self.expl_offset if self.expled else None,
4809             expl_tlen=self.expl_tlen if self.expled else None,
4810             expl_llen=self.expl_llen if self.expled else None,
4811             expl_vlen=self.expl_vlen if self.expled else None,
4812             expl_lenindef=self.expl_lenindef,
4813             ber_encoded=self.ber_encoded,
4814             bered=self.bered,
4815         )
4816         for pp in self.pps_lenindef(decode_path):
4817             yield pp
4818
4819
4820 class UTF8String(CommonString):
4821     __slots__ = ()
4822     tag_default = tag_encode(12)
4823     encoding = "utf-8"
4824     asn1_type_name = "UTF8String"
4825
4826
4827 class AllowableCharsMixin(object):
4828     @property
4829     def allowable_chars(self):
4830         if PY2:
4831             return self._allowable_chars
4832         return frozenset(six_unichr(c) for c in self._allowable_chars)
4833
4834
4835 class NumericString(AllowableCharsMixin, CommonString):
4836     """Numeric string
4837
4838     Its value is properly sanitized: only ASCII digits with spaces can
4839     be stored.
4840
4841     >>> NumericString().allowable_chars
4842     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4843     """
4844     __slots__ = ()
4845     tag_default = tag_encode(18)
4846     encoding = "ascii"
4847     asn1_type_name = "NumericString"
4848     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4849
4850     def _value_sanitize(self, value):
4851         value = super(NumericString, self)._value_sanitize(value)
4852         if not frozenset(value) <= self._allowable_chars:
4853             raise DecodeError("non-numeric value")
4854         return value
4855
4856
4857 PrintableStringState = namedtuple(
4858     "PrintableStringState",
4859     OctetStringState._fields + ("allowable_chars",),
4860     **NAMEDTUPLE_KWARGS
4861 )
4862
4863
4864 class PrintableString(AllowableCharsMixin, CommonString):
4865     """Printable string
4866
4867     Its value is properly sanitized: see X.680 41.4 table 10.
4868
4869     >>> PrintableString().allowable_chars
4870     frozenset([' ', "'", ..., 'z'])
4871     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4872     PrintableString PrintableString foo*bar
4873     >>> obj.allow_asterisk, obj.allow_ampersand
4874     (True, False)
4875     """
4876     __slots__ = ()
4877     tag_default = tag_encode(19)
4878     encoding = "ascii"
4879     asn1_type_name = "PrintableString"
4880     _allowable_chars = frozenset(
4881         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4882     )
4883     _asterisk = frozenset("*".encode("ascii"))
4884     _ampersand = frozenset("&".encode("ascii"))
4885
4886     def __init__(
4887             self,
4888             value=None,
4889             bounds=None,
4890             impl=None,
4891             expl=None,
4892             default=None,
4893             optional=False,
4894             _decoded=(0, 0, 0),
4895             ctx=None,
4896             allow_asterisk=False,
4897             allow_ampersand=False,
4898     ):
4899         """
4900         :param allow_asterisk: allow asterisk character
4901         :param allow_ampersand: allow ampersand character
4902         """
4903         if allow_asterisk:
4904             self._allowable_chars |= self._asterisk
4905         if allow_ampersand:
4906             self._allowable_chars |= self._ampersand
4907         super(PrintableString, self).__init__(
4908             value, bounds, impl, expl, default, optional, _decoded, ctx,
4909         )
4910
4911     @property
4912     def allow_asterisk(self):
4913         """Is asterisk character allowed?
4914         """
4915         return self._asterisk <= self._allowable_chars
4916
4917     @property
4918     def allow_ampersand(self):
4919         """Is ampersand character allowed?
4920         """
4921         return self._ampersand <= self._allowable_chars
4922
4923     def _value_sanitize(self, value):
4924         value = super(PrintableString, self)._value_sanitize(value)
4925         if not frozenset(value) <= self._allowable_chars:
4926             raise DecodeError("non-printable value")
4927         return value
4928
4929     def __getstate__(self):
4930         return PrintableStringState(
4931             *super(PrintableString, self).__getstate__(),
4932             **{"allowable_chars": self._allowable_chars}
4933         )
4934
4935     def __setstate__(self, state):
4936         super(PrintableString, self).__setstate__(state)
4937         self._allowable_chars = state.allowable_chars
4938
4939     def __call__(
4940             self,
4941             value=None,
4942             bounds=None,
4943             impl=None,
4944             expl=None,
4945             default=None,
4946             optional=None,
4947     ):
4948         return self.__class__(
4949             value=value,
4950             bounds=(
4951                 (self._bound_min, self._bound_max)
4952                 if bounds is None else bounds
4953             ),
4954             impl=self.tag if impl is None else impl,
4955             expl=self._expl if expl is None else expl,
4956             default=self.default if default is None else default,
4957             optional=self.optional if optional is None else optional,
4958             allow_asterisk=self.allow_asterisk,
4959             allow_ampersand=self.allow_ampersand,
4960         )
4961
4962
4963 class TeletexString(CommonString):
4964     __slots__ = ()
4965     tag_default = tag_encode(20)
4966     encoding = "ascii"
4967     asn1_type_name = "TeletexString"
4968
4969
4970 class T61String(TeletexString):
4971     __slots__ = ()
4972     asn1_type_name = "T61String"
4973
4974
4975 class VideotexString(CommonString):
4976     __slots__ = ()
4977     tag_default = tag_encode(21)
4978     encoding = "iso-8859-1"
4979     asn1_type_name = "VideotexString"
4980
4981
4982 class IA5String(CommonString):
4983     __slots__ = ()
4984     tag_default = tag_encode(22)
4985     encoding = "ascii"
4986     asn1_type_name = "IA5"
4987
4988
4989 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4990 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4991 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4992 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4993 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4994 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4995
4996
4997 class VisibleString(CommonString):
4998     __slots__ = ()
4999     tag_default = tag_encode(26)
5000     encoding = "ascii"
5001     asn1_type_name = "VisibleString"
5002
5003
5004 UTCTimeState = namedtuple(
5005     "UTCTimeState",
5006     OctetStringState._fields + ("ber_raw",),
5007     **NAMEDTUPLE_KWARGS
5008 )
5009
5010
5011 def str_to_time_fractions(value):
5012     v = pureint(value)
5013     year, v = (v // 10**10), (v % 10**10)
5014     month, v = (v // 10**8), (v % 10**8)
5015     day, v = (v // 10**6), (v % 10**6)
5016     hour, v = (v // 10**4), (v % 10**4)
5017     minute, second = (v // 100), (v % 100)
5018     return year, month, day, hour, minute, second
5019
5020
5021 class UTCTime(VisibleString):
5022     """``UTCTime`` datetime type
5023
5024     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5025     UTCTime UTCTime 2017-09-30T22:07:50
5026     >>> str(t)
5027     '170930220750Z'
5028     >>> bytes(t)
5029     b'170930220750Z'
5030     >>> t.todatetime()
5031     datetime.datetime(2017, 9, 30, 22, 7, 50)
5032     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5033     datetime.datetime(1957, 9, 30, 22, 7, 50)
5034
5035     If BER encoded value was met, then ``ber_raw`` attribute will hold
5036     its raw representation.
5037
5038     .. warning::
5039
5040        Pay attention that UTCTime can not hold full year, so all years
5041        having < 50 years are treated as 20xx, 19xx otherwise, according
5042        to X.509 recommendation.
5043
5044     .. warning::
5045
5046        No strict validation of UTC offsets are made, but very crude:
5047
5048        * minutes are not exceeding 60
5049        * offset value is not exceeding 14 hours
5050     """
5051     __slots__ = ("ber_raw",)
5052     tag_default = tag_encode(23)
5053     encoding = "ascii"
5054     asn1_type_name = "UTCTime"
5055     evgen_mode_skip_value = False
5056
5057     def __init__(
5058             self,
5059             value=None,
5060             impl=None,
5061             expl=None,
5062             default=None,
5063             optional=False,
5064             _decoded=(0, 0, 0),
5065             bounds=None,  # dummy argument, workability for OctetString.decode
5066             ctx=None,
5067     ):
5068         """
5069         :param value: set the value. Either datetime type, or
5070                       :py:class:`pyderasn.UTCTime` object
5071         :param bytes impl: override default tag with ``IMPLICIT`` one
5072         :param bytes expl: override default tag with ``EXPLICIT`` one
5073         :param default: set default value. Type same as in ``value``
5074         :param bool optional: is object ``OPTIONAL`` in sequence
5075         """
5076         super(UTCTime, self).__init__(
5077             None, None, impl, expl, None, optional, _decoded, ctx,
5078         )
5079         self._value = value
5080         self.ber_raw = None
5081         if value is not None:
5082             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5083             self.ber_encoded = self.ber_raw is not None
5084         if default is not None:
5085             default, _ = self._value_sanitize(default)
5086             self.default = self.__class__(
5087                 value=default,
5088                 impl=self.tag,
5089                 expl=self._expl,
5090             )
5091             if self._value is None:
5092                 self._value = default
5093             optional = True
5094         self.optional = optional
5095
5096     def _strptime_bered(self, value):
5097         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5098         value = value[10:]
5099         if len(value) == 0:
5100             raise ValueError("no timezone")
5101         year += 2000 if year < 50 else 1900
5102         decoded = datetime(year, month, day, hour, minute)
5103         offset = 0
5104         if value[-1] == "Z":
5105             value = value[:-1]
5106         else:
5107             if len(value) < 5:
5108                 raise ValueError("invalid UTC offset")
5109             if value[-5] == "-":
5110                 sign = -1
5111             elif value[-5] == "+":
5112                 sign = 1
5113             else:
5114                 raise ValueError("invalid UTC offset")
5115             v = pureint(value[-4:])
5116             offset, v = (60 * (v % 100)), v // 100
5117             if offset >= 3600:
5118                 raise ValueError("invalid UTC offset minutes")
5119             offset += 3600 * v
5120             if offset > 14 * 3600:
5121                 raise ValueError("too big UTC offset")
5122             offset *= sign
5123             value = value[:-5]
5124         if len(value) == 0:
5125             return offset, decoded
5126         if len(value) != 2:
5127             raise ValueError("invalid UTC offset seconds")
5128         seconds = pureint(value)
5129         if seconds >= 60:
5130             raise ValueError("invalid seconds value")
5131         return offset, decoded + timedelta(seconds=seconds)
5132
5133     def _strptime(self, value):
5134         # datetime.strptime's format: %y%m%d%H%M%SZ
5135         if len(value) != LEN_YYMMDDHHMMSSZ:
5136             raise ValueError("invalid UTCTime length")
5137         if value[-1] != "Z":
5138             raise ValueError("non UTC timezone")
5139         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5140         year += 2000 if year < 50 else 1900
5141         return datetime(year, month, day, hour, minute, second)
5142
5143     def _dt_sanitize(self, value):
5144         if value.year < 1950 or value.year > 2049:
5145             raise ValueError("UTCTime can hold only 1950-2049 years")
5146         return value.replace(microsecond=0)
5147
5148     def _value_sanitize(self, value, ctx=None):
5149         if value.__class__ == binary_type:
5150             try:
5151                 value_decoded = value.decode("ascii")
5152             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5153                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5154             err = None
5155             try:
5156                 return self._strptime(value_decoded), None
5157             except (TypeError, ValueError) as _err:
5158                 err = _err
5159                 if (ctx is not None) and ctx.get("bered", False):
5160                     try:
5161                         offset, _value = self._strptime_bered(value_decoded)
5162                         _value = _value - timedelta(seconds=offset)
5163                         return self._dt_sanitize(_value), value
5164                     except (TypeError, ValueError, OverflowError) as _err:
5165                         err = _err
5166             raise DecodeError(
5167                 "invalid %s format: %r" % (self.asn1_type_name, err),
5168                 klass=self.__class__,
5169             )
5170         if isinstance(value, self.__class__):
5171             return value._value, None
5172         if value.__class__ == datetime:
5173             return self._dt_sanitize(value), None
5174         raise InvalidValueType((self.__class__, datetime))
5175
5176     def _pp_value(self):
5177         if self.ready:
5178             value = self._value.isoformat()
5179             if self.ber_encoded:
5180                 value += " (%s)" % self.ber_raw
5181             return value
5182         return None
5183
5184     def __unicode__(self):
5185         if self.ready:
5186             value = self._value.isoformat()
5187             if self.ber_encoded:
5188                 value += " (%s)" % self.ber_raw
5189             return value
5190         return text_type(self._pp_value())
5191
5192     def __getstate__(self):
5193         return UTCTimeState(
5194             *super(UTCTime, self).__getstate__(),
5195             **{"ber_raw": self.ber_raw}
5196         )
5197
5198     def __setstate__(self, state):
5199         super(UTCTime, self).__setstate__(state)
5200         self.ber_raw = state.ber_raw
5201
5202     def __bytes__(self):
5203         self._assert_ready()
5204         return self._encode_time()
5205
5206     def __eq__(self, their):
5207         if their.__class__ == binary_type:
5208             return self._encode_time() == their
5209         if their.__class__ == datetime:
5210             return self.todatetime() == their
5211         if not isinstance(their, self.__class__):
5212             return False
5213         return (
5214             self._value == their._value and
5215             self.tag == their.tag and
5216             self._expl == their._expl
5217         )
5218
5219     def _encode_time(self):
5220         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5221
5222     def _encode(self):
5223         self._assert_ready()
5224         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5225
5226     def _encode1st(self, state):
5227         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5228
5229     def _encode2nd(self, writer, state_iter):
5230         self._assert_ready()
5231         write_full(writer, self._encode())
5232
5233     def _encode_cer(self, writer):
5234         write_full(writer, self._encode())
5235
5236     def todatetime(self):
5237         return self._value
5238
5239     def __repr__(self):
5240         return pp_console_row(next(self.pps()))
5241
5242     def pps(self, decode_path=()):
5243         yield _pp(
5244             obj=self,
5245             asn1_type_name=self.asn1_type_name,
5246             obj_name=self.__class__.__name__,
5247             decode_path=decode_path,
5248             value=self._pp_value(),
5249             optional=self.optional,
5250             default=self == self.default,
5251             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5252             expl=None if self._expl is None else tag_decode(self._expl),
5253             offset=self.offset,
5254             tlen=self.tlen,
5255             llen=self.llen,
5256             vlen=self.vlen,
5257             expl_offset=self.expl_offset if self.expled else None,
5258             expl_tlen=self.expl_tlen if self.expled else None,
5259             expl_llen=self.expl_llen if self.expled else None,
5260             expl_vlen=self.expl_vlen if self.expled else None,
5261             expl_lenindef=self.expl_lenindef,
5262             ber_encoded=self.ber_encoded,
5263             bered=self.bered,
5264         )
5265         for pp in self.pps_lenindef(decode_path):
5266             yield pp
5267
5268
5269 class GeneralizedTime(UTCTime):
5270     """``GeneralizedTime`` datetime type
5271
5272     This type is similar to :py:class:`pyderasn.UTCTime`.
5273
5274     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5275     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5276     >>> str(t)
5277     '20170930220750.000123Z'
5278     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5279     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5280
5281     .. warning::
5282
5283        Only microsecond fractions are supported in DER encoding.
5284        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5285        higher precision values.
5286
5287     .. warning::
5288
5289        BER encoded data can loss information (accuracy) during decoding
5290        because of float transformations.
5291
5292     .. warning::
5293
5294        Local times (without explicit timezone specification) are treated
5295        as UTC one, no transformations are made.
5296
5297     .. warning::
5298
5299        Zero year is unsupported.
5300     """
5301     __slots__ = ()
5302     tag_default = tag_encode(24)
5303     asn1_type_name = "GeneralizedTime"
5304
5305     def _dt_sanitize(self, value):
5306         return value
5307
5308     def _strptime_bered(self, value):
5309         if len(value) < 4 + 3 * 2:
5310             raise ValueError("invalid GeneralizedTime")
5311         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5312         decoded = datetime(year, month, day, hour)
5313         offset, value = 0, value[10:]
5314         if len(value) == 0:
5315             return offset, decoded
5316         if value[-1] == "Z":
5317             value = value[:-1]
5318         else:
5319             for char, sign in (("-", -1), ("+", 1)):
5320                 idx = value.rfind(char)
5321                 if idx == -1:
5322                     continue
5323                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5324                 v = pureint(offset_raw)
5325                 if len(offset_raw) == 4:
5326                     offset, v = (60 * (v % 100)), v // 100
5327                     if offset >= 3600:
5328                         raise ValueError("invalid UTC offset minutes")
5329                 elif len(offset_raw) == 2:
5330                     pass
5331                 else:
5332                     raise ValueError("invalid UTC offset")
5333                 offset += 3600 * v
5334                 if offset > 14 * 3600:
5335                     raise ValueError("too big UTC offset")
5336                 offset *= sign
5337                 break
5338         if len(value) == 0:
5339             return offset, decoded
5340         if value[0] in DECIMAL_SIGNS:
5341             return offset, (
5342                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5343             )
5344         if len(value) < 2:
5345             raise ValueError("stripped minutes")
5346         decoded += timedelta(seconds=60 * pureint(value[:2]))
5347         value = value[2:]
5348         if len(value) == 0:
5349             return offset, decoded
5350         if value[0] in DECIMAL_SIGNS:
5351             return offset, (
5352                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5353             )
5354         if len(value) < 2:
5355             raise ValueError("stripped seconds")
5356         decoded += timedelta(seconds=pureint(value[:2]))
5357         value = value[2:]
5358         if len(value) == 0:
5359             return offset, decoded
5360         if value[0] not in DECIMAL_SIGNS:
5361             raise ValueError("invalid format after seconds")
5362         return offset, (
5363             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5364         )
5365
5366     def _strptime(self, value):
5367         l = len(value)
5368         if l == LEN_YYYYMMDDHHMMSSZ:
5369             # datetime.strptime's format: %Y%m%d%H%M%SZ
5370             if value[-1] != "Z":
5371                 raise ValueError("non UTC timezone")
5372             return datetime(*str_to_time_fractions(value[:-1]))
5373         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5374             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5375             if value[-1] != "Z":
5376                 raise ValueError("non UTC timezone")
5377             if value[14] != ".":
5378                 raise ValueError("no fractions separator")
5379             us = value[15:-1]
5380             if us[-1] == "0":
5381                 raise ValueError("trailing zero")
5382             us_len = len(us)
5383             if us_len > 6:
5384                 raise ValueError("only microsecond fractions are supported")
5385             us = pureint(us + ("0" * (6 - us_len)))
5386             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5387             return datetime(year, month, day, hour, minute, second, us)
5388         raise ValueError("invalid GeneralizedTime length")
5389
5390     def _encode_time(self):
5391         value = self._value
5392         encoded = value.strftime("%Y%m%d%H%M%S")
5393         if value.microsecond > 0:
5394             encoded += (".%06d" % value.microsecond).rstrip("0")
5395         return (encoded + "Z").encode("ascii")
5396
5397     def _encode(self):
5398         self._assert_ready()
5399         value = self._value
5400         if value.microsecond > 0:
5401             encoded = self._encode_time()
5402             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5403         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5404
5405     def _encode1st(self, state):
5406         self._assert_ready()
5407         vlen = len(self._encode_time())
5408         return len(self.tag) + len_size(vlen) + vlen, state
5409
5410     def _encode2nd(self, writer, state_iter):
5411         write_full(writer, self._encode())
5412
5413
5414 class GraphicString(CommonString):
5415     __slots__ = ()
5416     tag_default = tag_encode(25)
5417     encoding = "iso-8859-1"
5418     asn1_type_name = "GraphicString"
5419
5420
5421 class ISO646String(VisibleString):
5422     __slots__ = ()
5423     asn1_type_name = "ISO646String"
5424
5425
5426 class GeneralString(CommonString):
5427     __slots__ = ()
5428     tag_default = tag_encode(27)
5429     encoding = "iso-8859-1"
5430     asn1_type_name = "GeneralString"
5431
5432
5433 class UniversalString(CommonString):
5434     __slots__ = ()
5435     tag_default = tag_encode(28)
5436     encoding = "utf-32-be"
5437     asn1_type_name = "UniversalString"
5438
5439
5440 class BMPString(CommonString):
5441     __slots__ = ()
5442     tag_default = tag_encode(30)
5443     encoding = "utf-16-be"
5444     asn1_type_name = "BMPString"
5445
5446
5447 ChoiceState = namedtuple(
5448     "ChoiceState",
5449     BasicState._fields + ("specs", "value",),
5450     **NAMEDTUPLE_KWARGS
5451 )
5452
5453
5454 class Choice(Obj):
5455     """``CHOICE`` special type
5456
5457     ::
5458
5459         class GeneralName(Choice):
5460             schema = (
5461                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5462                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5463             )
5464
5465     >>> gn = GeneralName()
5466     GeneralName CHOICE
5467     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5468     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5469     >>> gn["dNSName"] = IA5String("bar.baz")
5470     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5471     >>> gn["rfc822Name"]
5472     None
5473     >>> gn["dNSName"]
5474     [2] IA5String IA5 bar.baz
5475     >>> gn.choice
5476     'dNSName'
5477     >>> gn.value == gn["dNSName"]
5478     True
5479     >>> gn.specs
5480     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5481
5482     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5483     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5484     """
5485     __slots__ = ("specs",)
5486     tag_default = None
5487     asn1_type_name = "CHOICE"
5488
5489     def __init__(
5490             self,
5491             value=None,
5492             schema=None,
5493             impl=None,
5494             expl=None,
5495             default=None,
5496             optional=False,
5497             _decoded=(0, 0, 0),
5498     ):
5499         """
5500         :param value: set the value. Either ``(choice, value)`` tuple, or
5501                       :py:class:`pyderasn.Choice` object
5502         :param bytes impl: can not be set, do **not** use it
5503         :param bytes expl: override default tag with ``EXPLICIT`` one
5504         :param default: set default value. Type same as in ``value``
5505         :param bool optional: is object ``OPTIONAL`` in sequence
5506         """
5507         if impl is not None:
5508             raise ValueError("no implicit tag allowed for CHOICE")
5509         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5510         if schema is None:
5511             schema = getattr(self, "schema", ())
5512         if len(schema) == 0:
5513             raise ValueError("schema must be specified")
5514         self.specs = (
5515             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5516         )
5517         self._value = None
5518         if value is not None:
5519             self._value = self._value_sanitize(value)
5520         if default is not None:
5521             default_value = self._value_sanitize(default)
5522             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5523             default_obj.specs = self.specs
5524             default_obj._value = default_value
5525             self.default = default_obj
5526             if value is None:
5527                 self._value = copy(default_obj._value)
5528         if self._expl is not None:
5529             tag_class, _, tag_num = tag_decode(self._expl)
5530             self._tag_order = (tag_class, tag_num)
5531
5532     def _value_sanitize(self, value):
5533         if (value.__class__ == tuple) and len(value) == 2:
5534             choice, obj = value
5535             spec = self.specs.get(choice)
5536             if spec is None:
5537                 raise ObjUnknown(choice)
5538             if not isinstance(obj, spec.__class__):
5539                 raise InvalidValueType((spec,))
5540             return (choice, spec(obj))
5541         if isinstance(value, self.__class__):
5542             return value._value
5543         raise InvalidValueType((self.__class__, tuple))
5544
5545     @property
5546     def ready(self):
5547         return self._value is not None and self._value[1].ready
5548
5549     @property
5550     def bered(self):
5551         return self.expl_lenindef or (
5552             (self._value is not None) and
5553             self._value[1].bered
5554         )
5555
5556     def __getstate__(self):
5557         return ChoiceState(
5558             __version__,
5559             self.tag,
5560             self._tag_order,
5561             self._expl,
5562             self.default,
5563             self.optional,
5564             self.offset,
5565             self.llen,
5566             self.vlen,
5567             self.expl_lenindef,
5568             self.lenindef,
5569             self.ber_encoded,
5570             self.specs,
5571             copy(self._value),
5572         )
5573
5574     def __setstate__(self, state):
5575         super(Choice, self).__setstate__(state)
5576         self.specs = state.specs
5577         self._value = state.value
5578
5579     def __eq__(self, their):
5580         if (their.__class__ == tuple) and len(their) == 2:
5581             return self._value == their
5582         if not isinstance(their, self.__class__):
5583             return False
5584         return (
5585             self.specs == their.specs and
5586             self._value == their._value
5587         )
5588
5589     def __call__(
5590             self,
5591             value=None,
5592             expl=None,
5593             default=None,
5594             optional=None,
5595     ):
5596         return self.__class__(
5597             value=value,
5598             schema=self.specs,
5599             expl=self._expl if expl is None else expl,
5600             default=self.default if default is None else default,
5601             optional=self.optional if optional is None else optional,
5602         )
5603
5604     @property
5605     def choice(self):
5606         """Name of the choice
5607         """
5608         self._assert_ready()
5609         return self._value[0]
5610
5611     @property
5612     def value(self):
5613         """Value of underlying choice
5614         """
5615         self._assert_ready()
5616         return self._value[1]
5617
5618     @property
5619     def tag_order(self):
5620         self._assert_ready()
5621         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5622
5623     @property
5624     def tag_order_cer(self):
5625         return min(v.tag_order_cer for v in itervalues(self.specs))
5626
5627     def __getitem__(self, key):
5628         if key not in self.specs:
5629             raise ObjUnknown(key)
5630         if self._value is None:
5631             return None
5632         choice, value = self._value
5633         if choice != key:
5634             return None
5635         return value
5636
5637     def __setitem__(self, key, value):
5638         spec = self.specs.get(key)
5639         if spec is None:
5640             raise ObjUnknown(key)
5641         if not isinstance(value, spec.__class__):
5642             raise InvalidValueType((spec.__class__,))
5643         self._value = (key, spec(value))
5644
5645     @property
5646     def tlen(self):
5647         return 0
5648
5649     @property
5650     def decoded(self):
5651         return self._value[1].decoded if self.ready else False
5652
5653     def _encode(self):
5654         self._assert_ready()
5655         return self._value[1].encode()
5656
5657     def _encode1st(self, state):
5658         self._assert_ready()
5659         return self._value[1].encode1st(state)
5660
5661     def _encode2nd(self, writer, state_iter):
5662         self._value[1].encode2nd(writer, state_iter)
5663
5664     def _encode_cer(self, writer):
5665         self._assert_ready()
5666         self._value[1].encode_cer(writer)
5667
5668     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5669         for choice, spec in iteritems(self.specs):
5670             sub_decode_path = decode_path + (choice,)
5671             try:
5672                 spec.decode(
5673                     tlv,
5674                     offset=offset,
5675                     leavemm=True,
5676                     decode_path=sub_decode_path,
5677                     ctx=ctx,
5678                     tag_only=True,
5679                     _ctx_immutable=False,
5680                 )
5681             except TagMismatch:
5682                 continue
5683             break
5684         else:
5685             raise TagMismatch(
5686                 klass=self.__class__,
5687                 decode_path=decode_path,
5688                 offset=offset,
5689             )
5690         if tag_only:  # pragma: no cover
5691             yield None
5692             return
5693         if evgen_mode:
5694             for _decode_path, value, tail in spec.decode_evgen(
5695                     tlv,
5696                     offset=offset,
5697                     leavemm=True,
5698                     decode_path=sub_decode_path,
5699                     ctx=ctx,
5700                     _ctx_immutable=False,
5701             ):
5702                 yield _decode_path, value, tail
5703         else:
5704             _, value, tail = next(spec.decode_evgen(
5705                 tlv,
5706                 offset=offset,
5707                 leavemm=True,
5708                 decode_path=sub_decode_path,
5709                 ctx=ctx,
5710                 _ctx_immutable=False,
5711                 _evgen_mode=False,
5712             ))
5713         obj = self.__class__(
5714             schema=self.specs,
5715             expl=self._expl,
5716             default=self.default,
5717             optional=self.optional,
5718             _decoded=(offset, 0, value.fulllen),
5719         )
5720         obj._value = (choice, value)
5721         yield decode_path, obj, tail
5722
5723     def __repr__(self):
5724         value = pp_console_row(next(self.pps()))
5725         if self.ready:
5726             value = "%s[%r]" % (value, self.value)
5727         return value
5728
5729     def pps(self, decode_path=()):
5730         yield _pp(
5731             obj=self,
5732             asn1_type_name=self.asn1_type_name,
5733             obj_name=self.__class__.__name__,
5734             decode_path=decode_path,
5735             value=self.choice if self.ready else None,
5736             optional=self.optional,
5737             default=self == self.default,
5738             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5739             expl=None if self._expl is None else tag_decode(self._expl),
5740             offset=self.offset,
5741             tlen=self.tlen,
5742             llen=self.llen,
5743             vlen=self.vlen,
5744             expl_lenindef=self.expl_lenindef,
5745             bered=self.bered,
5746         )
5747         if self.ready:
5748             yield self.value.pps(decode_path=decode_path + (self.choice,))
5749         for pp in self.pps_lenindef(decode_path):
5750             yield pp
5751
5752
5753 class PrimitiveTypes(Choice):
5754     """Predefined ``CHOICE`` for all generic primitive types
5755
5756     It could be useful for general decoding of some unspecified values:
5757
5758     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5759     OCTET STRING 3 bytes 666f6f
5760     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5761     INTEGER 1193046
5762     """
5763     __slots__ = ()
5764     schema = tuple((klass.__name__, klass()) for klass in (
5765         Boolean,
5766         Integer,
5767         BitString,
5768         OctetString,
5769         Null,
5770         ObjectIdentifier,
5771         UTF8String,
5772         NumericString,
5773         PrintableString,
5774         TeletexString,
5775         VideotexString,
5776         IA5String,
5777         UTCTime,
5778         GeneralizedTime,
5779         GraphicString,
5780         VisibleString,
5781         ISO646String,
5782         GeneralString,
5783         UniversalString,
5784         BMPString,
5785     ))
5786
5787
5788 AnyState = namedtuple(
5789     "AnyState",
5790     BasicState._fields + ("value", "defined"),
5791     **NAMEDTUPLE_KWARGS
5792 )
5793
5794
5795 class Any(Obj):
5796     """``ANY`` special type
5797
5798     >>> Any(Integer(-123))
5799     ANY INTEGER -123 (0X:7B)
5800     >>> a = Any(OctetString(b"hello world").encode())
5801     ANY 040b68656c6c6f20776f726c64
5802     >>> hexenc(bytes(a))
5803     b'0x040x0bhello world'
5804     """
5805     __slots__ = ("defined",)
5806     tag_default = tag_encode(0)
5807     asn1_type_name = "ANY"
5808
5809     def __init__(
5810             self,
5811             value=None,
5812             expl=None,
5813             optional=False,
5814             _decoded=(0, 0, 0),
5815     ):
5816         """
5817         :param value: set the value. Either any kind of pyderasn's
5818                       **ready** object, or bytes. Pay attention that
5819                       **no** validation is performed if raw binary value
5820                       is valid TLV, except just tag decoding
5821         :param bytes expl: override default tag with ``EXPLICIT`` one
5822         :param bool optional: is object ``OPTIONAL`` in sequence
5823         """
5824         super(Any, self).__init__(None, expl, None, optional, _decoded)
5825         if value is None:
5826             self._value = None
5827         else:
5828             value = self._value_sanitize(value)
5829             self._value = value
5830             if self._expl is None:
5831                 if value.__class__ == binary_type:
5832                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5833                 else:
5834                     tag_class, tag_num = value.tag_order
5835             else:
5836                 tag_class, _, tag_num = tag_decode(self._expl)
5837             self._tag_order = (tag_class, tag_num)
5838         self.defined = None
5839
5840     def _value_sanitize(self, value):
5841         if value.__class__ == binary_type:
5842             if len(value) == 0:
5843                 raise ValueError("Any value can not be empty")
5844             return value
5845         if isinstance(value, self.__class__):
5846             return value._value
5847         if not isinstance(value, Obj):
5848             raise InvalidValueType((self.__class__, Obj, binary_type))
5849         return value
5850
5851     @property
5852     def ready(self):
5853         return self._value is not None
5854
5855     @property
5856     def tag_order(self):
5857         self._assert_ready()
5858         return self._tag_order
5859
5860     @property
5861     def bered(self):
5862         if self.expl_lenindef or self.lenindef:
5863             return True
5864         if self.defined is None:
5865             return False
5866         return self.defined[1].bered
5867
5868     def __getstate__(self):
5869         return AnyState(
5870             __version__,
5871             self.tag,
5872             self._tag_order,
5873             self._expl,
5874             None,
5875             self.optional,
5876             self.offset,
5877             self.llen,
5878             self.vlen,
5879             self.expl_lenindef,
5880             self.lenindef,
5881             self.ber_encoded,
5882             self._value,
5883             self.defined,
5884         )
5885
5886     def __setstate__(self, state):
5887         super(Any, self).__setstate__(state)
5888         self._value = state.value
5889         self.defined = state.defined
5890
5891     def __eq__(self, their):
5892         if their.__class__ == binary_type:
5893             if self._value.__class__ == binary_type:
5894                 return self._value == their
5895             return self._value.encode() == their
5896         if issubclass(their.__class__, Any):
5897             if self.ready and their.ready:
5898                 return bytes(self) == bytes(their)
5899             return self.ready == their.ready
5900         return False
5901
5902     def __call__(
5903             self,
5904             value=None,
5905             expl=None,
5906             optional=None,
5907     ):
5908         return self.__class__(
5909             value=value,
5910             expl=self._expl if expl is None else expl,
5911             optional=self.optional if optional is None else optional,
5912         )
5913
5914     def __bytes__(self):
5915         self._assert_ready()
5916         value = self._value
5917         if value.__class__ == binary_type:
5918             return value
5919         return self._value.encode()
5920
5921     @property
5922     def tlen(self):
5923         return 0
5924
5925     def _encode(self):
5926         self._assert_ready()
5927         value = self._value
5928         if value.__class__ == binary_type:
5929             return value
5930         return value.encode()
5931
5932     def _encode1st(self, state):
5933         self._assert_ready()
5934         value = self._value
5935         if value.__class__ == binary_type:
5936             return len(value), state
5937         return value.encode1st(state)
5938
5939     def _encode2nd(self, writer, state_iter):
5940         value = self._value
5941         if value.__class__ == binary_type:
5942             write_full(writer, value)
5943         else:
5944             value.encode2nd(writer, state_iter)
5945
5946     def _encode_cer(self, writer):
5947         self._assert_ready()
5948         value = self._value
5949         if value.__class__ == binary_type:
5950             write_full(writer, value)
5951         else:
5952             value.encode_cer(writer)
5953
5954     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5955         try:
5956             t, tlen, lv = tag_strip(tlv)
5957         except DecodeError as err:
5958             raise err.__class__(
5959                 msg=err.msg,
5960                 klass=self.__class__,
5961                 decode_path=decode_path,
5962                 offset=offset,
5963             )
5964         try:
5965             l, llen, v = len_decode(lv)
5966         except LenIndefForm as err:
5967             if not ctx.get("bered", False):
5968                 raise err.__class__(
5969                     msg=err.msg,
5970                     klass=self.__class__,
5971                     decode_path=decode_path,
5972                     offset=offset,
5973                 )
5974             llen, vlen, v = 1, 0, lv[1:]
5975             sub_offset = offset + tlen + llen
5976             chunk_i = 0
5977             while v[:EOC_LEN].tobytes() != EOC:
5978                 chunk, v = Any().decode(
5979                     v,
5980                     offset=sub_offset,
5981                     decode_path=decode_path + (str(chunk_i),),
5982                     leavemm=True,
5983                     ctx=ctx,
5984                     _ctx_immutable=False,
5985                 )
5986                 vlen += chunk.tlvlen
5987                 sub_offset += chunk.tlvlen
5988                 chunk_i += 1
5989             tlvlen = tlen + llen + vlen + EOC_LEN
5990             obj = self.__class__(
5991                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5992                 expl=self._expl,
5993                 optional=self.optional,
5994                 _decoded=(offset, 0, tlvlen),
5995             )
5996             obj.lenindef = True
5997             obj.tag = t.tobytes()
5998             yield decode_path, obj, v[EOC_LEN:]
5999             return
6000         except DecodeError as err:
6001             raise err.__class__(
6002                 msg=err.msg,
6003                 klass=self.__class__,
6004                 decode_path=decode_path,
6005                 offset=offset,
6006             )
6007         if l > len(v):
6008             raise NotEnoughData(
6009                 "encoded length is longer than data",
6010                 klass=self.__class__,
6011                 decode_path=decode_path,
6012                 offset=offset,
6013             )
6014         tlvlen = tlen + llen + l
6015         v, tail = tlv[:tlvlen], v[l:]
6016         obj = self.__class__(
6017             value=None if evgen_mode else v.tobytes(),
6018             expl=self._expl,
6019             optional=self.optional,
6020             _decoded=(offset, 0, tlvlen),
6021         )
6022         obj.tag = t.tobytes()
6023         yield decode_path, obj, tail
6024
6025     def __repr__(self):
6026         return pp_console_row(next(self.pps()))
6027
6028     def pps(self, decode_path=()):
6029         value = self._value
6030         if value is None:
6031             pass
6032         elif value.__class__ == binary_type:
6033             value = None
6034         else:
6035             value = repr(value)
6036         yield _pp(
6037             obj=self,
6038             asn1_type_name=self.asn1_type_name,
6039             obj_name=self.__class__.__name__,
6040             decode_path=decode_path,
6041             value=value,
6042             blob=self._value if self._value.__class__ == binary_type else None,
6043             optional=self.optional,
6044             default=self == self.default,
6045             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6046             expl=None if self._expl is None else tag_decode(self._expl),
6047             offset=self.offset,
6048             tlen=self.tlen,
6049             llen=self.llen,
6050             vlen=self.vlen,
6051             expl_offset=self.expl_offset if self.expled else None,
6052             expl_tlen=self.expl_tlen if self.expled else None,
6053             expl_llen=self.expl_llen if self.expled else None,
6054             expl_vlen=self.expl_vlen if self.expled else None,
6055             expl_lenindef=self.expl_lenindef,
6056             lenindef=self.lenindef,
6057             bered=self.bered,
6058         )
6059         defined_by, defined = self.defined or (None, None)
6060         if defined_by is not None:
6061             yield defined.pps(
6062                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6063             )
6064         for pp in self.pps_lenindef(decode_path):
6065             yield pp
6066
6067
6068 ########################################################################
6069 # ASN.1 constructed types
6070 ########################################################################
6071
6072 def abs_decode_path(decode_path, rel_path):
6073     """Create an absolute decode path from current and relative ones
6074
6075     :param decode_path: current decode path, starting point. Tuple of strings
6076     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6077                      If first tuple's element is "/", then treat it as
6078                      an absolute path, ignoring ``decode_path`` as
6079                      starting point. Also this tuple can contain ".."
6080                      elements, stripping the leading element from
6081                      ``decode_path``
6082
6083     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6084     ("foo", "bar", "baz", "whatever")
6085     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6086     ("foo", "whatever")
6087     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6088     ("baz", "whatever")
6089     """
6090     if rel_path[0] == "/":
6091         return rel_path[1:]
6092     if rel_path[0] == "..":
6093         return abs_decode_path(decode_path[:-1], rel_path[1:])
6094     return decode_path + rel_path
6095
6096
6097 SequenceState = namedtuple(
6098     "SequenceState",
6099     BasicState._fields + ("specs", "value",),
6100     **NAMEDTUPLE_KWARGS
6101 )
6102
6103
6104 class SequenceEncode1stMixing(object):
6105     def _encode1st(self, state):
6106         state.append(0)
6107         idx = len(state) - 1
6108         vlen = 0
6109         for v in self._values_for_encoding():
6110             l, _ = v.encode1st(state)
6111             vlen += l
6112         state[idx] = vlen
6113         return len(self.tag) + len_size(vlen) + vlen, state
6114
6115
6116 class Sequence(SequenceEncode1stMixing, Obj):
6117     """``SEQUENCE`` structure type
6118
6119     You have to make specification of sequence::
6120
6121         class Extension(Sequence):
6122             schema = (
6123                 ("extnID", ObjectIdentifier()),
6124                 ("critical", Boolean(default=False)),
6125                 ("extnValue", OctetString()),
6126             )
6127
6128     Then, you can work with it as with dictionary.
6129
6130     >>> ext = Extension()
6131     >>> Extension().specs
6132     OrderedDict([
6133         ('extnID', OBJECT IDENTIFIER),
6134         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6135         ('extnValue', OCTET STRING),
6136     ])
6137     >>> ext["extnID"] = "1.2.3"
6138     Traceback (most recent call last):
6139     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6140     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6141
6142     You can determine if sequence is ready to be encoded:
6143
6144     >>> ext.ready
6145     False
6146     >>> ext.encode()
6147     Traceback (most recent call last):
6148     pyderasn.ObjNotReady: object is not ready: extnValue
6149     >>> ext["extnValue"] = OctetString(b"foobar")
6150     >>> ext.ready
6151     True
6152
6153     Value you want to assign, must have the same **type** as in
6154     corresponding specification, but it can have different tags,
6155     optional/default attributes -- they will be taken from specification
6156     automatically::
6157
6158         class TBSCertificate(Sequence):
6159             schema = (
6160                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6161             [...]
6162
6163     >>> tbs = TBSCertificate()
6164     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6165
6166     Assign ``None`` to remove value from sequence.
6167
6168     You can set values in Sequence during its initialization:
6169
6170     >>> AlgorithmIdentifier((
6171         ("algorithm", ObjectIdentifier("1.2.3")),
6172         ("parameters", Any(Null()))
6173     ))
6174     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6175
6176     You can determine if value exists/set in the sequence and take its value:
6177
6178     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6179     (True, True, False)
6180     >>> ext["extnID"]
6181     OBJECT IDENTIFIER 1.2.3
6182
6183     But pay attention that if value has default, then it won't be (not
6184     in) in the sequence (because ``DEFAULT`` must not be encoded in
6185     DER), but you can read its value:
6186
6187     >>> "critical" in ext, ext["critical"]
6188     (False, BOOLEAN False)
6189     >>> ext["critical"] = Boolean(True)
6190     >>> "critical" in ext, ext["critical"]
6191     (True, BOOLEAN True)
6192
6193     All defaulted values are always optional.
6194
6195     .. _allow_default_values_ctx:
6196
6197     DER prohibits default value encoding and will raise an error if
6198     default value is unexpectedly met during decode.
6199     If :ref:`bered <bered_ctx>` context option is set, then no error
6200     will be raised, but ``bered`` attribute set. You can disable strict
6201     defaulted values existence validation by setting
6202     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6203
6204     All values with DEFAULT specified are decoded atomically in
6205     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6206     SEQUENCE, then it will be yielded as a single element, not
6207     disassembled. That is required for DEFAULT existence check.
6208
6209     Two sequences are equal if they have equal specification (schema),
6210     implicit/explicit tagging and the same values.
6211     """
6212     __slots__ = ("specs",)
6213     tag_default = tag_encode(form=TagFormConstructed, num=16)
6214     asn1_type_name = "SEQUENCE"
6215
6216     def __init__(
6217             self,
6218             value=None,
6219             schema=None,
6220             impl=None,
6221             expl=None,
6222             default=None,
6223             optional=False,
6224             _decoded=(0, 0, 0),
6225     ):
6226         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6227         if schema is None:
6228             schema = getattr(self, "schema", ())
6229         self.specs = (
6230             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6231         )
6232         self._value = {}
6233         if value is not None:
6234             if issubclass(value.__class__, Sequence):
6235                 self._value = value._value
6236             elif hasattr(value, "__iter__"):
6237                 for seq_key, seq_value in value:
6238                     self[seq_key] = seq_value
6239             else:
6240                 raise InvalidValueType((Sequence,))
6241         if default is not None:
6242             if not issubclass(default.__class__, Sequence):
6243                 raise InvalidValueType((Sequence,))
6244             default_value = default._value
6245             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6246             default_obj.specs = self.specs
6247             default_obj._value = default_value
6248             self.default = default_obj
6249             if value is None:
6250                 self._value = copy(default_obj._value)
6251
6252     @property
6253     def ready(self):
6254         for name, spec in iteritems(self.specs):
6255             value = self._value.get(name)
6256             if value is None:
6257                 if spec.optional:
6258                     continue
6259                 return False
6260             if not value.ready:
6261                 return False
6262         return True
6263
6264     @property
6265     def bered(self):
6266         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6267             return True
6268         return any(value.bered for value in itervalues(self._value))
6269
6270     def __getstate__(self):
6271         return SequenceState(
6272             __version__,
6273             self.tag,
6274             self._tag_order,
6275             self._expl,
6276             self.default,
6277             self.optional,
6278             self.offset,
6279             self.llen,
6280             self.vlen,
6281             self.expl_lenindef,
6282             self.lenindef,
6283             self.ber_encoded,
6284             self.specs,
6285             {k: copy(v) for k, v in iteritems(self._value)},
6286         )
6287
6288     def __setstate__(self, state):
6289         super(Sequence, self).__setstate__(state)
6290         self.specs = state.specs
6291         self._value = state.value
6292
6293     def __eq__(self, their):
6294         if not isinstance(their, self.__class__):
6295             return False
6296         return (
6297             self.specs == their.specs and
6298             self.tag == their.tag and
6299             self._expl == their._expl and
6300             self._value == their._value
6301         )
6302
6303     def __call__(
6304             self,
6305             value=None,
6306             impl=None,
6307             expl=None,
6308             default=None,
6309             optional=None,
6310     ):
6311         return self.__class__(
6312             value=value,
6313             schema=self.specs,
6314             impl=self.tag if impl is None else impl,
6315             expl=self._expl if expl is None else expl,
6316             default=self.default if default is None else default,
6317             optional=self.optional if optional is None else optional,
6318         )
6319
6320     def __contains__(self, key):
6321         return key in self._value
6322
6323     def __setitem__(self, key, value):
6324         spec = self.specs.get(key)
6325         if spec is None:
6326             raise ObjUnknown(key)
6327         if value is None:
6328             self._value.pop(key, None)
6329             return
6330         if not isinstance(value, spec.__class__):
6331             raise InvalidValueType((spec.__class__,))
6332         value = spec(value=value)
6333         if spec.default is not None and value == spec.default:
6334             self._value.pop(key, None)
6335             return
6336         self._value[key] = value
6337
6338     def __getitem__(self, key):
6339         value = self._value.get(key)
6340         if value is not None:
6341             return value
6342         spec = self.specs.get(key)
6343         if spec is None:
6344             raise ObjUnknown(key)
6345         if spec.default is not None:
6346             return spec.default
6347         return None
6348
6349     def _values_for_encoding(self):
6350         for name, spec in iteritems(self.specs):
6351             value = self._value.get(name)
6352             if value is None:
6353                 if spec.optional:
6354                     continue
6355                 raise ObjNotReady(name)
6356             yield value
6357
6358     def _encode(self):
6359         v = b"".join(v.encode() for v in self._values_for_encoding())
6360         return b"".join((self.tag, len_encode(len(v)), v))
6361
6362     def _encode2nd(self, writer, state_iter):
6363         write_full(writer, self.tag + len_encode(next(state_iter)))
6364         for v in self._values_for_encoding():
6365             v.encode2nd(writer, state_iter)
6366
6367     def _encode_cer(self, writer):
6368         write_full(writer, self.tag + LENINDEF)
6369         for v in self._values_for_encoding():
6370             v.encode_cer(writer)
6371         write_full(writer, EOC)
6372
6373     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6374         try:
6375             t, tlen, lv = tag_strip(tlv)
6376         except DecodeError as err:
6377             raise err.__class__(
6378                 msg=err.msg,
6379                 klass=self.__class__,
6380                 decode_path=decode_path,
6381                 offset=offset,
6382             )
6383         if t != self.tag:
6384             raise TagMismatch(
6385                 klass=self.__class__,
6386                 decode_path=decode_path,
6387                 offset=offset,
6388             )
6389         if tag_only:  # pragma: no cover
6390             yield None
6391             return
6392         lenindef = False
6393         ctx_bered = ctx.get("bered", False)
6394         try:
6395             l, llen, v = len_decode(lv)
6396         except LenIndefForm as err:
6397             if not ctx_bered:
6398                 raise err.__class__(
6399                     msg=err.msg,
6400                     klass=self.__class__,
6401                     decode_path=decode_path,
6402                     offset=offset,
6403                 )
6404             l, llen, v = 0, 1, lv[1:]
6405             lenindef = True
6406         except DecodeError as err:
6407             raise err.__class__(
6408                 msg=err.msg,
6409                 klass=self.__class__,
6410                 decode_path=decode_path,
6411                 offset=offset,
6412             )
6413         if l > len(v):
6414             raise NotEnoughData(
6415                 "encoded length is longer than data",
6416                 klass=self.__class__,
6417                 decode_path=decode_path,
6418                 offset=offset,
6419             )
6420         if not lenindef:
6421             v, tail = v[:l], v[l:]
6422         vlen = 0
6423         sub_offset = offset + tlen + llen
6424         values = {}
6425         ber_encoded = False
6426         ctx_allow_default_values = ctx.get("allow_default_values", False)
6427         for name, spec in iteritems(self.specs):
6428             if spec.optional and (
6429                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6430                     len(v) == 0
6431             ):
6432                 continue
6433             spec_defaulted = spec.default is not None
6434             sub_decode_path = decode_path + (name,)
6435             try:
6436                 if evgen_mode and not spec_defaulted:
6437                     for _decode_path, value, v_tail in spec.decode_evgen(
6438                             v,
6439                             sub_offset,
6440                             leavemm=True,
6441                             decode_path=sub_decode_path,
6442                             ctx=ctx,
6443                             _ctx_immutable=False,
6444                     ):
6445                         yield _decode_path, value, v_tail
6446                 else:
6447                     _, value, v_tail = next(spec.decode_evgen(
6448                         v,
6449                         sub_offset,
6450                         leavemm=True,
6451                         decode_path=sub_decode_path,
6452                         ctx=ctx,
6453                         _ctx_immutable=False,
6454                         _evgen_mode=False,
6455                     ))
6456             except TagMismatch as err:
6457                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6458                     continue
6459                 raise
6460
6461             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6462             if not evgen_mode and defined is not None:
6463                 defined_by, defined_spec = defined
6464                 if issubclass(value.__class__, SequenceOf):
6465                     for i, _value in enumerate(value):
6466                         sub_sub_decode_path = sub_decode_path + (
6467                             str(i),
6468                             DecodePathDefBy(defined_by),
6469                         )
6470                         defined_value, defined_tail = defined_spec.decode(
6471                             memoryview(bytes(_value)),
6472                             sub_offset + (
6473                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6474                                 if value.expled else (value.tlen + value.llen)
6475                             ),
6476                             leavemm=True,
6477                             decode_path=sub_sub_decode_path,
6478                             ctx=ctx,
6479                             _ctx_immutable=False,
6480                         )
6481                         if len(defined_tail) > 0:
6482                             raise DecodeError(
6483                                 "remaining data",
6484                                 klass=self.__class__,
6485                                 decode_path=sub_sub_decode_path,
6486                                 offset=offset,
6487                             )
6488                         _value.defined = (defined_by, defined_value)
6489                 else:
6490                     defined_value, defined_tail = defined_spec.decode(
6491                         memoryview(bytes(value)),
6492                         sub_offset + (
6493                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6494                             if value.expled else (value.tlen + value.llen)
6495                         ),
6496                         leavemm=True,
6497                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6498                         ctx=ctx,
6499                         _ctx_immutable=False,
6500                     )
6501                     if len(defined_tail) > 0:
6502                         raise DecodeError(
6503                             "remaining data",
6504                             klass=self.__class__,
6505                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6506                             offset=offset,
6507                         )
6508                     value.defined = (defined_by, defined_value)
6509
6510             value_len = value.fulllen
6511             vlen += value_len
6512             sub_offset += value_len
6513             v = v_tail
6514             if spec_defaulted:
6515                 if evgen_mode:
6516                     yield sub_decode_path, value, v_tail
6517                 if value == spec.default:
6518                     if ctx_bered or ctx_allow_default_values:
6519                         ber_encoded = True
6520                     else:
6521                         raise DecodeError(
6522                             "DEFAULT value met",
6523                             klass=self.__class__,
6524                             decode_path=sub_decode_path,
6525                             offset=sub_offset,
6526                         )
6527             if not evgen_mode:
6528                 values[name] = value
6529                 spec_defines = getattr(spec, "defines", ())
6530                 if len(spec_defines) == 0:
6531                     defines_by_path = ctx.get("defines_by_path", ())
6532                     if len(defines_by_path) > 0:
6533                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6534                 if spec_defines is not None and len(spec_defines) > 0:
6535                     for rel_path, schema in spec_defines:
6536                         defined = schema.get(value, None)
6537                         if defined is not None:
6538                             ctx.setdefault("_defines", []).append((
6539                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6540                                 (value, defined),
6541                             ))
6542         if lenindef:
6543             if v[:EOC_LEN].tobytes() != EOC:
6544                 raise DecodeError(
6545                     "no EOC",
6546                     klass=self.__class__,
6547                     decode_path=decode_path,
6548                     offset=offset,
6549                 )
6550             tail = v[EOC_LEN:]
6551             vlen += EOC_LEN
6552         elif len(v) > 0:
6553             raise DecodeError(
6554                 "remaining data",
6555                 klass=self.__class__,
6556                 decode_path=decode_path,
6557                 offset=offset,
6558             )
6559         obj = self.__class__(
6560             schema=self.specs,
6561             impl=self.tag,
6562             expl=self._expl,
6563             default=self.default,
6564             optional=self.optional,
6565             _decoded=(offset, llen, vlen),
6566         )
6567         obj._value = values
6568         obj.lenindef = lenindef
6569         obj.ber_encoded = ber_encoded
6570         yield decode_path, obj, tail
6571
6572     def __repr__(self):
6573         value = pp_console_row(next(self.pps()))
6574         cols = []
6575         for name in self.specs:
6576             _value = self._value.get(name)
6577             if _value is None:
6578                 continue
6579             cols.append("%s: %s" % (name, repr(_value)))
6580         return "%s[%s]" % (value, "; ".join(cols))
6581
6582     def pps(self, decode_path=()):
6583         yield _pp(
6584             obj=self,
6585             asn1_type_name=self.asn1_type_name,
6586             obj_name=self.__class__.__name__,
6587             decode_path=decode_path,
6588             optional=self.optional,
6589             default=self == self.default,
6590             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6591             expl=None if self._expl is None else tag_decode(self._expl),
6592             offset=self.offset,
6593             tlen=self.tlen,
6594             llen=self.llen,
6595             vlen=self.vlen,
6596             expl_offset=self.expl_offset if self.expled else None,
6597             expl_tlen=self.expl_tlen if self.expled else None,
6598             expl_llen=self.expl_llen if self.expled else None,
6599             expl_vlen=self.expl_vlen if self.expled else None,
6600             expl_lenindef=self.expl_lenindef,
6601             lenindef=self.lenindef,
6602             ber_encoded=self.ber_encoded,
6603             bered=self.bered,
6604         )
6605         for name in self.specs:
6606             value = self._value.get(name)
6607             if value is None:
6608                 continue
6609             yield value.pps(decode_path=decode_path + (name,))
6610         for pp in self.pps_lenindef(decode_path):
6611             yield pp
6612
6613
6614 class Set(Sequence, SequenceEncode1stMixing):
6615     """``SET`` structure type
6616
6617     Its usage is identical to :py:class:`pyderasn.Sequence`.
6618
6619     .. _allow_unordered_set_ctx:
6620
6621     DER prohibits unordered values encoding and will raise an error
6622     during decode. If :ref:`bered <bered_ctx>` context option is set,
6623     then no error will occur. Also you can disable strict values
6624     ordering check by setting ``"allow_unordered_set": True``
6625     :ref:`context <ctx>` option.
6626     """
6627     __slots__ = ()
6628     tag_default = tag_encode(form=TagFormConstructed, num=17)
6629     asn1_type_name = "SET"
6630
6631     def _values_for_encoding(self):
6632         return sorted(
6633             super(Set, self)._values_for_encoding(),
6634             key=attrgetter("tag_order"),
6635         )
6636
6637     def _encode_cer(self, writer):
6638         write_full(writer, self.tag + LENINDEF)
6639         for v in sorted(
6640                 super(Set, self)._values_for_encoding(),
6641                 key=attrgetter("tag_order_cer"),
6642         ):
6643             v.encode_cer(writer)
6644         write_full(writer, EOC)
6645
6646     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6647         try:
6648             t, tlen, lv = tag_strip(tlv)
6649         except DecodeError as err:
6650             raise err.__class__(
6651                 msg=err.msg,
6652                 klass=self.__class__,
6653                 decode_path=decode_path,
6654                 offset=offset,
6655             )
6656         if t != self.tag:
6657             raise TagMismatch(
6658                 klass=self.__class__,
6659                 decode_path=decode_path,
6660                 offset=offset,
6661             )
6662         if tag_only:
6663             yield None
6664             return
6665         lenindef = False
6666         ctx_bered = ctx.get("bered", False)
6667         try:
6668             l, llen, v = len_decode(lv)
6669         except LenIndefForm as err:
6670             if not ctx_bered:
6671                 raise err.__class__(
6672                     msg=err.msg,
6673                     klass=self.__class__,
6674                     decode_path=decode_path,
6675                     offset=offset,
6676                 )
6677             l, llen, v = 0, 1, lv[1:]
6678             lenindef = True
6679         except DecodeError as err:
6680             raise err.__class__(
6681                 msg=err.msg,
6682                 klass=self.__class__,
6683                 decode_path=decode_path,
6684                 offset=offset,
6685             )
6686         if l > len(v):
6687             raise NotEnoughData(
6688                 "encoded length is longer than data",
6689                 klass=self.__class__,
6690                 offset=offset,
6691             )
6692         if not lenindef:
6693             v, tail = v[:l], v[l:]
6694         vlen = 0
6695         sub_offset = offset + tlen + llen
6696         values = {}
6697         ber_encoded = False
6698         ctx_allow_default_values = ctx.get("allow_default_values", False)
6699         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6700         tag_order_prev = (0, 0)
6701         _specs_items = copy(self.specs)
6702
6703         while len(v) > 0:
6704             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6705                 break
6706             for name, spec in iteritems(_specs_items):
6707                 sub_decode_path = decode_path + (name,)
6708                 try:
6709                     spec.decode(
6710                         v,
6711                         sub_offset,
6712                         leavemm=True,
6713                         decode_path=sub_decode_path,
6714                         ctx=ctx,
6715                         tag_only=True,
6716                         _ctx_immutable=False,
6717                     )
6718                 except TagMismatch:
6719                     continue
6720                 break
6721             else:
6722                 raise TagMismatch(
6723                     klass=self.__class__,
6724                     decode_path=decode_path,
6725                     offset=offset,
6726                 )
6727             spec_defaulted = spec.default is not None
6728             if evgen_mode and not spec_defaulted:
6729                 for _decode_path, value, v_tail in spec.decode_evgen(
6730                         v,
6731                         sub_offset,
6732                         leavemm=True,
6733                         decode_path=sub_decode_path,
6734                         ctx=ctx,
6735                         _ctx_immutable=False,
6736                 ):
6737                     yield _decode_path, value, v_tail
6738             else:
6739                 _, value, v_tail = next(spec.decode_evgen(
6740                     v,
6741                     sub_offset,
6742                     leavemm=True,
6743                     decode_path=sub_decode_path,
6744                     ctx=ctx,
6745                     _ctx_immutable=False,
6746                     _evgen_mode=False,
6747                 ))
6748             value_tag_order = value.tag_order
6749             value_len = value.fulllen
6750             if tag_order_prev >= value_tag_order:
6751                 if ctx_bered or ctx_allow_unordered_set:
6752                     ber_encoded = True
6753                 else:
6754                     raise DecodeError(
6755                         "unordered " + self.asn1_type_name,
6756                         klass=self.__class__,
6757                         decode_path=sub_decode_path,
6758                         offset=sub_offset,
6759                     )
6760             if spec_defaulted:
6761                 if evgen_mode:
6762                     yield sub_decode_path, value, v_tail
6763                 if value != spec.default:
6764                     pass
6765                 elif ctx_bered or ctx_allow_default_values:
6766                     ber_encoded = True
6767                 else:
6768                     raise DecodeError(
6769                         "DEFAULT value met",
6770                         klass=self.__class__,
6771                         decode_path=sub_decode_path,
6772                         offset=sub_offset,
6773                     )
6774             values[name] = value
6775             del _specs_items[name]
6776             tag_order_prev = value_tag_order
6777             sub_offset += value_len
6778             vlen += value_len
6779             v = v_tail
6780
6781         obj = self.__class__(
6782             schema=self.specs,
6783             impl=self.tag,
6784             expl=self._expl,
6785             default=self.default,
6786             optional=self.optional,
6787             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6788         )
6789         if lenindef:
6790             if v[:EOC_LEN].tobytes() != EOC:
6791                 raise DecodeError(
6792                     "no EOC",
6793                     klass=self.__class__,
6794                     decode_path=decode_path,
6795                     offset=offset,
6796                 )
6797             tail = v[EOC_LEN:]
6798             obj.lenindef = True
6799         for name, spec in iteritems(self.specs):
6800             if name not in values and not spec.optional:
6801                 raise DecodeError(
6802                     "%s value is not ready" % name,
6803                     klass=self.__class__,
6804                     decode_path=decode_path,
6805                     offset=offset,
6806                 )
6807         if not evgen_mode:
6808             obj._value = values
6809         obj.ber_encoded = ber_encoded
6810         yield decode_path, obj, tail
6811
6812
6813 SequenceOfState = namedtuple(
6814     "SequenceOfState",
6815     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6816     **NAMEDTUPLE_KWARGS
6817 )
6818
6819
6820 class SequenceOf(SequenceEncode1stMixing, Obj):
6821     """``SEQUENCE OF`` sequence type
6822
6823     For that kind of type you must specify the object it will carry on
6824     (bounds are for example here, not required)::
6825
6826         class Ints(SequenceOf):
6827             schema = Integer()
6828             bounds = (0, 2)
6829
6830     >>> ints = Ints()
6831     >>> ints.append(Integer(123))
6832     >>> ints.append(Integer(234))
6833     >>> ints
6834     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6835     >>> [int(i) for i in ints]
6836     [123, 234]
6837     >>> ints.append(Integer(345))
6838     Traceback (most recent call last):
6839     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6840     >>> ints[1]
6841     INTEGER 234
6842     >>> ints[1] = Integer(345)
6843     >>> ints
6844     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6845
6846     You can initialize sequence with preinitialized values:
6847
6848     >>> ints = Ints([Integer(123), Integer(234)])
6849
6850     Also you can use iterator as a value:
6851
6852     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6853
6854     And it won't be iterated until encoding process. Pay attention that
6855     bounds and required schema checks are done only during the encoding
6856     process in that case! After encode was called, then value is zeroed
6857     back to empty list and you have to set it again. That mode is useful
6858     mainly with CER encoding mode, where all objects from the iterable
6859     will be streamed to the buffer, without copying all of them to
6860     memory first.
6861     """
6862     __slots__ = ("spec", "_bound_min", "_bound_max")
6863     tag_default = tag_encode(form=TagFormConstructed, num=16)
6864     asn1_type_name = "SEQUENCE OF"
6865
6866     def __init__(
6867             self,
6868             value=None,
6869             schema=None,
6870             bounds=None,
6871             impl=None,
6872             expl=None,
6873             default=None,
6874             optional=False,
6875             _decoded=(0, 0, 0),
6876     ):
6877         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6878         if schema is None:
6879             schema = getattr(self, "schema", None)
6880         if schema is None:
6881             raise ValueError("schema must be specified")
6882         self.spec = schema
6883         self._bound_min, self._bound_max = getattr(
6884             self,
6885             "bounds",
6886             (0, float("+inf")),
6887         ) if bounds is None else bounds
6888         self._value = []
6889         if value is not None:
6890             self._value = self._value_sanitize(value)
6891         if default is not None:
6892             default_value = self._value_sanitize(default)
6893             default_obj = self.__class__(
6894                 schema=schema,
6895                 impl=self.tag,
6896                 expl=self._expl,
6897             )
6898             default_obj._value = default_value
6899             self.default = default_obj
6900             if value is None:
6901                 self._value = copy(default_obj._value)
6902
6903     def _value_sanitize(self, value):
6904         iterator = False
6905         if issubclass(value.__class__, SequenceOf):
6906             value = value._value
6907         elif hasattr(value, NEXT_ATTR_NAME):
6908             iterator = True
6909         elif hasattr(value, "__iter__"):
6910             value = list(value)
6911         else:
6912             raise InvalidValueType((self.__class__, iter, "iterator"))
6913         if not iterator:
6914             if not self._bound_min <= len(value) <= self._bound_max:
6915                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6916             class_expected = self.spec.__class__
6917             for v in value:
6918                 if not isinstance(v, class_expected):
6919                     raise InvalidValueType((class_expected,))
6920         return value
6921
6922     @property
6923     def ready(self):
6924         if hasattr(self._value, NEXT_ATTR_NAME):
6925             return True
6926         if self._bound_min > 0 and len(self._value) == 0:
6927             return False
6928         return all(v.ready for v in self._value)
6929
6930     @property
6931     def bered(self):
6932         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6933             return True
6934         return any(v.bered for v in self._value)
6935
6936     def __getstate__(self):
6937         if hasattr(self._value, NEXT_ATTR_NAME):
6938             raise ValueError("can not pickle SequenceOf with iterator")
6939         return SequenceOfState(
6940             __version__,
6941             self.tag,
6942             self._tag_order,
6943             self._expl,
6944             self.default,
6945             self.optional,
6946             self.offset,
6947             self.llen,
6948             self.vlen,
6949             self.expl_lenindef,
6950             self.lenindef,
6951             self.ber_encoded,
6952             self.spec,
6953             [copy(v) for v in self._value],
6954             self._bound_min,
6955             self._bound_max,
6956         )
6957
6958     def __setstate__(self, state):
6959         super(SequenceOf, self).__setstate__(state)
6960         self.spec = state.spec
6961         self._value = state.value
6962         self._bound_min = state.bound_min
6963         self._bound_max = state.bound_max
6964
6965     def __eq__(self, their):
6966         if isinstance(their, self.__class__):
6967             return (
6968                 self.spec == their.spec and
6969                 self.tag == their.tag and
6970                 self._expl == their._expl and
6971                 self._value == their._value
6972             )
6973         if hasattr(their, "__iter__"):
6974             return self._value == list(their)
6975         return False
6976
6977     def __call__(
6978             self,
6979             value=None,
6980             bounds=None,
6981             impl=None,
6982             expl=None,
6983             default=None,
6984             optional=None,
6985     ):
6986         return self.__class__(
6987             value=value,
6988             schema=self.spec,
6989             bounds=(
6990                 (self._bound_min, self._bound_max)
6991                 if bounds is None else bounds
6992             ),
6993             impl=self.tag if impl is None else impl,
6994             expl=self._expl if expl is None else expl,
6995             default=self.default if default is None else default,
6996             optional=self.optional if optional is None else optional,
6997         )
6998
6999     def __contains__(self, key):
7000         return key in self._value
7001
7002     def append(self, value):
7003         if not isinstance(value, self.spec.__class__):
7004             raise InvalidValueType((self.spec.__class__,))
7005         if len(self._value) + 1 > self._bound_max:
7006             raise BoundsError(
7007                 self._bound_min,
7008                 len(self._value) + 1,
7009                 self._bound_max,
7010             )
7011         self._value.append(value)
7012
7013     def __iter__(self):
7014         return iter(self._value)
7015
7016     def __len__(self):
7017         return len(self._value)
7018
7019     def __setitem__(self, key, value):
7020         if not isinstance(value, self.spec.__class__):
7021             raise InvalidValueType((self.spec.__class__,))
7022         self._value[key] = self.spec(value=value)
7023
7024     def __getitem__(self, key):
7025         return self._value[key]
7026
7027     def _values_for_encoding(self):
7028         return iter(self._value)
7029
7030     def _encode(self):
7031         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7032         if iterator:
7033             values = []
7034             values_append = values.append
7035             class_expected = self.spec.__class__
7036             values_for_encoding = self._values_for_encoding()
7037             self._value = []
7038             for v in values_for_encoding:
7039                 if not isinstance(v, class_expected):
7040                     raise InvalidValueType((class_expected,))
7041                 values_append(v.encode())
7042             if not self._bound_min <= len(values) <= self._bound_max:
7043                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7044             value = b"".join(values)
7045         else:
7046             value = b"".join(v.encode() for v in self._values_for_encoding())
7047         return b"".join((self.tag, len_encode(len(value)), value))
7048
7049     def _encode1st(self, state):
7050         state = super(SequenceOf, self)._encode1st(state)
7051         if hasattr(self._value, NEXT_ATTR_NAME):
7052             self._value = []
7053         return state
7054
7055     def _encode2nd(self, writer, state_iter):
7056         write_full(writer, self.tag + len_encode(next(state_iter)))
7057         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7058         if iterator:
7059             values_count = 0
7060             class_expected = self.spec.__class__
7061             values_for_encoding = self._values_for_encoding()
7062             self._value = []
7063             for v in values_for_encoding:
7064                 if not isinstance(v, class_expected):
7065                     raise InvalidValueType((class_expected,))
7066                 v.encode2nd(writer, state_iter)
7067                 values_count += 1
7068             if not self._bound_min <= values_count <= self._bound_max:
7069                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7070         else:
7071             for v in self._values_for_encoding():
7072                 v.encode2nd(writer, state_iter)
7073
7074     def _encode_cer(self, writer):
7075         write_full(writer, self.tag + LENINDEF)
7076         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7077         if iterator:
7078             class_expected = self.spec.__class__
7079             values_count = 0
7080             values_for_encoding = self._values_for_encoding()
7081             self._value = []
7082             for v in values_for_encoding:
7083                 if not isinstance(v, class_expected):
7084                     raise InvalidValueType((class_expected,))
7085                 v.encode_cer(writer)
7086                 values_count += 1
7087             if not self._bound_min <= values_count <= self._bound_max:
7088                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7089         else:
7090             for v in self._values_for_encoding():
7091                 v.encode_cer(writer)
7092         write_full(writer, EOC)
7093
7094     def _decode(
7095             self,
7096             tlv,
7097             offset,
7098             decode_path,
7099             ctx,
7100             tag_only,
7101             evgen_mode,
7102             ordering_check=False,
7103     ):
7104         try:
7105             t, tlen, lv = tag_strip(tlv)
7106         except DecodeError as err:
7107             raise err.__class__(
7108                 msg=err.msg,
7109                 klass=self.__class__,
7110                 decode_path=decode_path,
7111                 offset=offset,
7112             )
7113         if t != self.tag:
7114             raise TagMismatch(
7115                 klass=self.__class__,
7116                 decode_path=decode_path,
7117                 offset=offset,
7118             )
7119         if tag_only:
7120             yield None
7121             return
7122         lenindef = False
7123         ctx_bered = ctx.get("bered", False)
7124         try:
7125             l, llen, v = len_decode(lv)
7126         except LenIndefForm as err:
7127             if not ctx_bered:
7128                 raise err.__class__(
7129                     msg=err.msg,
7130                     klass=self.__class__,
7131                     decode_path=decode_path,
7132                     offset=offset,
7133                 )
7134             l, llen, v = 0, 1, lv[1:]
7135             lenindef = True
7136         except DecodeError as err:
7137             raise err.__class__(
7138                 msg=err.msg,
7139                 klass=self.__class__,
7140                 decode_path=decode_path,
7141                 offset=offset,
7142             )
7143         if l > len(v):
7144             raise NotEnoughData(
7145                 "encoded length is longer than data",
7146                 klass=self.__class__,
7147                 decode_path=decode_path,
7148                 offset=offset,
7149             )
7150         if not lenindef:
7151             v, tail = v[:l], v[l:]
7152         vlen = 0
7153         sub_offset = offset + tlen + llen
7154         _value = []
7155         _value_count = 0
7156         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7157         value_prev = memoryview(v[:0])
7158         ber_encoded = False
7159         spec = self.spec
7160         while len(v) > 0:
7161             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7162                 break
7163             sub_decode_path = decode_path + (str(_value_count),)
7164             if evgen_mode:
7165                 for _decode_path, value, v_tail in spec.decode_evgen(
7166                         v,
7167                         sub_offset,
7168                         leavemm=True,
7169                         decode_path=sub_decode_path,
7170                         ctx=ctx,
7171                         _ctx_immutable=False,
7172                 ):
7173                     yield _decode_path, value, v_tail
7174             else:
7175                 _, value, v_tail = next(spec.decode_evgen(
7176                     v,
7177                     sub_offset,
7178                     leavemm=True,
7179                     decode_path=sub_decode_path,
7180                     ctx=ctx,
7181                     _ctx_immutable=False,
7182                     _evgen_mode=False,
7183                 ))
7184             value_len = value.fulllen
7185             if ordering_check:
7186                 if value_prev.tobytes() > v[:value_len].tobytes():
7187                     if ctx_bered or ctx_allow_unordered_set:
7188                         ber_encoded = True
7189                     else:
7190                         raise DecodeError(
7191                             "unordered " + self.asn1_type_name,
7192                             klass=self.__class__,
7193                             decode_path=sub_decode_path,
7194                             offset=sub_offset,
7195                         )
7196                 value_prev = v[:value_len]
7197             _value_count += 1
7198             if not evgen_mode:
7199                 _value.append(value)
7200             sub_offset += value_len
7201             vlen += value_len
7202             v = v_tail
7203         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7204             raise DecodeError(
7205                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7206                 klass=self.__class__,
7207                 decode_path=decode_path,
7208                 offset=offset,
7209             )
7210         try:
7211             obj = self.__class__(
7212                 value=None if evgen_mode else _value,
7213                 schema=spec,
7214                 bounds=(self._bound_min, self._bound_max),
7215                 impl=self.tag,
7216                 expl=self._expl,
7217                 default=self.default,
7218                 optional=self.optional,
7219                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7220             )
7221         except BoundsError as err:
7222             raise DecodeError(
7223                 msg=str(err),
7224                 klass=self.__class__,
7225                 decode_path=decode_path,
7226                 offset=offset,
7227             )
7228         if lenindef:
7229             if v[:EOC_LEN].tobytes() != EOC:
7230                 raise DecodeError(
7231                     "no EOC",
7232                     klass=self.__class__,
7233                     decode_path=decode_path,
7234                     offset=offset,
7235                 )
7236             obj.lenindef = True
7237             tail = v[EOC_LEN:]
7238         obj.ber_encoded = ber_encoded
7239         yield decode_path, obj, tail
7240
7241     def __repr__(self):
7242         return "%s[%s]" % (
7243             pp_console_row(next(self.pps())),
7244             ", ".join(repr(v) for v in self._value),
7245         )
7246
7247     def pps(self, decode_path=()):
7248         yield _pp(
7249             obj=self,
7250             asn1_type_name=self.asn1_type_name,
7251             obj_name=self.__class__.__name__,
7252             decode_path=decode_path,
7253             optional=self.optional,
7254             default=self == self.default,
7255             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7256             expl=None if self._expl is None else tag_decode(self._expl),
7257             offset=self.offset,
7258             tlen=self.tlen,
7259             llen=self.llen,
7260             vlen=self.vlen,
7261             expl_offset=self.expl_offset if self.expled else None,
7262             expl_tlen=self.expl_tlen if self.expled else None,
7263             expl_llen=self.expl_llen if self.expled else None,
7264             expl_vlen=self.expl_vlen if self.expled else None,
7265             expl_lenindef=self.expl_lenindef,
7266             lenindef=self.lenindef,
7267             ber_encoded=self.ber_encoded,
7268             bered=self.bered,
7269         )
7270         for i, value in enumerate(self._value):
7271             yield value.pps(decode_path=decode_path + (str(i),))
7272         for pp in self.pps_lenindef(decode_path):
7273             yield pp
7274
7275
7276 class SetOf(SequenceOf):
7277     """``SET OF`` sequence type
7278
7279     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7280     """
7281     __slots__ = ()
7282     tag_default = tag_encode(form=TagFormConstructed, num=17)
7283     asn1_type_name = "SET OF"
7284
7285     def _value_sanitize(self, value):
7286         value = super(SetOf, self)._value_sanitize(value)
7287         if hasattr(value, NEXT_ATTR_NAME):
7288             raise ValueError(
7289                 "SetOf does not support iterator values, as no sense in them"
7290             )
7291         return value
7292
7293     def _encode(self):
7294         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7295         return b"".join((self.tag, len_encode(len(v)), v))
7296
7297     def _encode2nd(self, writer, state_iter):
7298         write_full(writer, self.tag + len_encode(next(state_iter)))
7299         values = []
7300         for v in self._values_for_encoding():
7301             buf = BytesIO()
7302             v.encode2nd(buf.write, state_iter)
7303             values.append(buf.getvalue())
7304         values.sort()
7305         for v in values:
7306             write_full(writer, v)
7307
7308     def _encode_cer(self, writer):
7309         write_full(writer, self.tag + LENINDEF)
7310         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7311             write_full(writer, v)
7312         write_full(writer, EOC)
7313
7314     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7315         return super(SetOf, self)._decode(
7316             tlv,
7317             offset,
7318             decode_path,
7319             ctx,
7320             tag_only,
7321             evgen_mode,
7322             ordering_check=True,
7323         )
7324
7325
7326 def obj_by_path(pypath):  # pragma: no cover
7327     """Import object specified as string Python path
7328
7329     Modules must be separated from classes/functions with ``:``.
7330
7331     >>> obj_by_path("foo.bar:Baz")
7332     <class 'foo.bar.Baz'>
7333     >>> obj_by_path("foo.bar:Baz.boo")
7334     <classmethod 'foo.bar.Baz.boo'>
7335     """
7336     mod, objs = pypath.rsplit(":", 1)
7337     from importlib import import_module
7338     obj = import_module(mod)
7339     for obj_name in objs.split("."):
7340         obj = getattr(obj, obj_name)
7341     return obj
7342
7343
7344 def generic_decoder():  # pragma: no cover
7345     # All of this below is a big hack with self references
7346     choice = PrimitiveTypes()
7347     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7348     choice.specs["SetOf"] = SetOf(schema=choice)
7349     for i in six_xrange(31):
7350         choice.specs["SequenceOf%d" % i] = SequenceOf(
7351             schema=choice,
7352             expl=tag_ctxc(i),
7353         )
7354     choice.specs["Any"] = Any()
7355
7356     # Class name equals to type name, to omit it from output
7357     class SEQUENCEOF(SequenceOf):
7358         __slots__ = ()
7359         schema = choice
7360
7361     def pprint_any(
7362             obj,
7363             oid_maps=(),
7364             with_colours=False,
7365             with_decode_path=False,
7366             decode_path_only=(),
7367             decode_path=(),
7368     ):
7369         def _pprint_pps(pps):
7370             for pp in pps:
7371                 if hasattr(pp, "_fields"):
7372                     if (
7373                             decode_path_only != () and
7374                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7375                     ):
7376                         continue
7377                     if pp.asn1_type_name == Choice.asn1_type_name:
7378                         continue
7379                     pp_kwargs = pp._asdict()
7380                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7381                     pp = _pp(**pp_kwargs)
7382                     yield pp_console_row(
7383                         pp,
7384                         oid_maps=oid_maps,
7385                         with_offsets=True,
7386                         with_blob=False,
7387                         with_colours=with_colours,
7388                         with_decode_path=with_decode_path,
7389                         decode_path_len_decrease=len(decode_path_only),
7390                     )
7391                     for row in pp_console_blob(
7392                             pp,
7393                             decode_path_len_decrease=len(decode_path_only),
7394                     ):
7395                         yield row
7396                 else:
7397                     for row in _pprint_pps(pp):
7398                         yield row
7399         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7400     return SEQUENCEOF(), pprint_any
7401
7402
7403 def main():  # pragma: no cover
7404     import argparse
7405     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7406     parser.add_argument(
7407         "--skip",
7408         type=int,
7409         default=0,
7410         help="Skip that number of bytes from the beginning",
7411     )
7412     parser.add_argument(
7413         "--oids",
7414         help="Python paths to dictionary with OIDs, comma separated",
7415     )
7416     parser.add_argument(
7417         "--schema",
7418         help="Python path to schema definition to use",
7419     )
7420     parser.add_argument(
7421         "--defines-by-path",
7422         help="Python path to decoder's defines_by_path",
7423     )
7424     parser.add_argument(
7425         "--nobered",
7426         action="store_true",
7427         help="Disallow BER encoding",
7428     )
7429     parser.add_argument(
7430         "--print-decode-path",
7431         action="store_true",
7432         help="Print decode paths",
7433     )
7434     parser.add_argument(
7435         "--decode-path-only",
7436         help="Print only specified decode path",
7437     )
7438     parser.add_argument(
7439         "--allow-expl-oob",
7440         action="store_true",
7441         help="Allow explicit tag out-of-bound",
7442     )
7443     parser.add_argument(
7444         "--evgen",
7445         action="store_true",
7446         help="Turn on event generation mode",
7447     )
7448     parser.add_argument(
7449         "RAWFile",
7450         type=argparse.FileType("rb"),
7451         help="Path to BER/CER/DER file you want to decode",
7452     )
7453     args = parser.parse_args()
7454     if PY2:
7455         args.RAWFile.seek(args.skip)
7456         raw = memoryview(args.RAWFile.read())
7457         args.RAWFile.close()
7458     else:
7459         raw = file_mmaped(args.RAWFile)[args.skip:]
7460     oid_maps = (
7461         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7462         if args.oids else ()
7463     )
7464     from functools import partial
7465     if args.schema:
7466         schema = obj_by_path(args.schema)
7467         pprinter = partial(pprint, big_blobs=True)
7468     else:
7469         schema, pprinter = generic_decoder()
7470     ctx = {
7471         "bered": not args.nobered,
7472         "allow_expl_oob": args.allow_expl_oob,
7473     }
7474     if args.defines_by_path is not None:
7475         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7476     from os import environ
7477     pprinter = partial(
7478         pprinter,
7479         oid_maps=oid_maps,
7480         with_colours=environ.get("NO_COLOR") is None,
7481         with_decode_path=args.print_decode_path,
7482         decode_path_only=(
7483             () if args.decode_path_only is None else
7484             tuple(args.decode_path_only.split(":"))
7485         ),
7486     )
7487     if args.evgen:
7488         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7489             print(pprinter(obj, decode_path=decode_path))
7490     else:
7491         obj, tail = schema().decode(raw, ctx=ctx)
7492         print(pprinter(obj))
7493     if tail != b"":
7494         print("\nTrailing data: %s" % hexenc(tail))
7495
7496
7497 if __name__ == "__main__":
7498     main()