]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
More seealso directive usage
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. _mmap:
669
670 mmap-ed file
671 ____________
672
673 POSIX compliant systems have ``mmap`` syscall, giving ability to work
674 the memory mapped file. You can deal with the file like it was an
675 ordinary binary string, allowing you not to load it to the memory first.
676 Also you can use them as an input for OCTET STRING, taking no Python
677 memory for their storage.
678
679 There is convenient :py:func:`pyderasn.file_mmaped` function that
680 creates read-only memoryview on the file contents::
681
682     with open("huge", "rb") as fd:
683         raw = file_mmaped(fd)
684         obj = Something.decode(raw)
685
686 .. warning::
687
688    mmap-ed files in Python2.7 does not implement buffer protocol, so
689    memoryview won't work on them.
690
691 .. warning::
692
693    mmap maps the **whole** file. So it plays no role if you seek-ed it
694    before. Take the slice of the resulting memoryview with required
695    offset instead.
696
697 .. note::
698
699    If you use ZFS as underlying storage, then pay attention that
700    currently most platforms does not deal good with ZFS ARC and ordinary
701    page cache used for mmaps. It can take twice the necessary size in
702    the memory: both in page cache and ZFS ARC.
703
704 CER encoding
705 ____________
706
707 We can parse any kind of data now, but how can we produce files
708 streamingly, without storing their encoded representation in memory?
709 SEQUENCE by default encodes in memory all its values, joins them in huge
710 binary string, just to know the exact size of SEQUENCE's value for
711 encoding it in TLV. DER requires you to know all exact sizes of the
712 objects.
713
714 You can use CER encoding mode, that slightly differs from the DER, but
715 does not require exact sizes knowledge, allowing streaming encoding
716 directly to some writer/buffer. Just use
717 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
718 encoded data will flow::
719
720     opener = io.open if PY2 else open
721     with opener("result", "wb") as fd:
722         obj.encode_cer(fd.write)
723
724 ::
725
726     buf = io.BytesIO()
727     obj.encode_cer(buf.write)
728
729 If you do not want to create in-memory buffer every time, then you can
730 use :py:func:`pyderasn.encode_cer` function::
731
732     data = encode_cer(obj)
733
734 Remember that CER is **not valid** DER in most cases, so you **have to**
735 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
736 decoding. Also currently there is **no** validation that provided CER is
737 valid one -- you are sure that it has only valid BER encoding.
738
739 .. warning::
740
741    SET OF values can not be streamingly encoded, because they are
742    required to be sorted byte-by-byte. Big SET OF values still will take
743    much memory. Use neither SET nor SET OF values, as modern ASN.1
744    also recommends too.
745
746 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
747 OCTET STRINGs! They will be streamingly copied from underlying file to
748 the buffer using 1 KB chunks.
749
750 Some structures require that some of the elements have to be forcefully
751 DER encoded. For example ``SignedData`` CMS requires you to encode
752 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
753 encode everything else in BER. You can tell any of the structures to be
754 forcefully encoded in DER during CER encoding, by specifying
755 ``der_forced=True`` attribute::
756
757     class Certificate(Sequence):
758         schema = (...)
759         der_forced = True
760
761     class SignedAttributes(SetOf):
762         schema = Attribute()
763         bounds = (1, 32)
764         der_forced = True
765
766 .. _agg_octet_string:
767
768 agg_octet_string
769 ________________
770
771 In most cases, huge quantity of binary data is stored as OCTET STRING.
772 CER encoding splits it on 1 KB chunks. BER allows splitting on various
773 levels of chunks inclusion::
774
775     SOME STRING[CONSTRUCTED]
776         OCTET STRING[CONSTRUCTED]
777             OCTET STRING[PRIMITIVE]
778                 DATA CHUNK
779             OCTET STRING[PRIMITIVE]
780                 DATA CHUNK
781             OCTET STRING[PRIMITIVE]
782                 DATA CHUNK
783         OCTET STRING[PRIMITIVE]
784             DATA CHUNK
785         OCTET STRING[CONSTRUCTED]
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788             OCTET STRING[PRIMITIVE]
789                 DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[CONSTRUCTED]
792                 OCTET STRING[PRIMITIVE]
793                     DATA CHUNK
794
795 You can not just take the offset and some ``.vlen`` of the STRING and
796 treat it as the payload. If you decode it without
797 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
798 and ``bytes()`` will give the whole payload contents.
799
800 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
801 small memory footprint. There is convenient
802 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
803 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
804 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
805 SHA512 digest of its ``eContent``::
806
807     fd = open("data.p7m", "rb")
808     raw = file_mmaped(fd)
809     ctx = {"bered": True}
810     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
811         if decode_path == ("content",):
812             content = obj
813             break
814     else:
815         raise ValueError("no content found")
816     hasher_state = sha512()
817     def hasher(data):
818         hasher_state.update(data)
819         return len(data)
820     evgens = SignedData().decode_evgen(
821         raw[content.offset:],
822         offset=content.offset,
823         ctx=ctx,
824     )
825     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
826     fd.close()
827     digest = hasher_state.digest()
828
829 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
830 copy the payload (without BER/CER encoding interleaved overhead) in it.
831 Virtually it won't take memory more than for keeping small structures
832 and 1 KB binary chunks.
833
834 .. _seqof-iterators:
835
836 SEQUENCE OF iterators
837 _____________________
838
839 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
840 classes. The only difference with providing the full list of objects, is
841 that type and bounds checking is done during encoding process. Also
842 sequence's value will be emptied after encoding, forcing you to set its
843 value again.
844
845 This is very useful when you have to create some huge objects, like
846 CRLs, with thousands and millions of entities inside. You can write the
847 generator taking necessary data from the database and giving the
848 ``RevokedCertificate`` objects. Only binary representation of that
849 objects will take memory during DER encoding.
850
851 2-pass DER encoding
852 -------------------
853
854 There is ability to do 2-pass encoding to DER, writing results directly
855 to specified writer (buffer, file, whatever). It could be 1.5+ times
856 slower than ordinary encoding, but it takes little memory for 1st pass
857 state storing. For example, 1st pass state for CACert.org's CRL with
858 ~416K of certificate entries takes nearly 3.5 MB of memory.
859 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
860 nearly 0.5 KB of memory.
861
862 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
863 iterators <seqof-iterators>` and write directly to opened file, then
864 there is very small memory footprint.
865
866 1st pass traverses through all the objects of the structure and returns
867 the size of DER encoded structure, together with 1st pass state object.
868 That state contains precalculated lengths for various objects inside the
869 structure.
870
871 ::
872
873     fulllen, state = obj.encode1st()
874
875 2nd pass takes the writer and 1st pass state. It traverses through all
876 the objects again, but writes their encoded representation to the writer.
877
878 ::
879
880     opener = io.open if PY2 else open
881     with opener("result", "wb") as fd:
882         obj.encode2nd(fd.write, iter(state))
883
884 .. warning::
885
886    You **MUST NOT** use 1st pass state if anything is changed in the
887    objects. It is intended to be used immediately after 1st pass is
888    done!
889
890 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
891 have to reinitialize the values after the 1st pass. And you **have to**
892 be sure that the iterator gives exactly the same values as previously.
893 Yes, you have to run your iterator twice -- because this is two pass
894 encoding mode.
895
896 If you want to encode to the memory, then you can use convenient
897 :py:func:`pyderasn.encode2pass` helper.
898
899 Base Obj
900 --------
901 .. autoclass:: pyderasn.Obj
902    :members:
903
904 Primitive types
905 ---------------
906
907 Boolean
908 _______
909 .. autoclass:: pyderasn.Boolean
910    :members: __init__
911
912 Integer
913 _______
914 .. autoclass:: pyderasn.Integer
915    :members: __init__, named
916
917 BitString
918 _________
919 .. autoclass:: pyderasn.BitString
920    :members: __init__, bit_len, named
921
922 OctetString
923 ___________
924 .. autoclass:: pyderasn.OctetString
925    :members: __init__
926
927 Null
928 ____
929 .. autoclass:: pyderasn.Null
930    :members: __init__
931
932 ObjectIdentifier
933 ________________
934 .. autoclass:: pyderasn.ObjectIdentifier
935    :members: __init__
936
937 Enumerated
938 __________
939 .. autoclass:: pyderasn.Enumerated
940
941 CommonString
942 ____________
943 .. autoclass:: pyderasn.CommonString
944
945 NumericString
946 _____________
947 .. autoclass:: pyderasn.NumericString
948
949 PrintableString
950 _______________
951 .. autoclass:: pyderasn.PrintableString
952    :members: __init__, allow_asterisk, allow_ampersand
953
954 UTCTime
955 _______
956 .. autoclass:: pyderasn.UTCTime
957    :members: __init__, todatetime
958
959 GeneralizedTime
960 _______________
961 .. autoclass:: pyderasn.GeneralizedTime
962    :members: __init__, todatetime
963
964 Special types
965 -------------
966
967 Choice
968 ______
969 .. autoclass:: pyderasn.Choice
970    :members: __init__, choice, value
971
972 PrimitiveTypes
973 ______________
974 .. autoclass:: PrimitiveTypes
975
976 Any
977 ___
978 .. autoclass:: pyderasn.Any
979    :members: __init__
980
981 Constructed types
982 -----------------
983
984 Sequence
985 ________
986 .. autoclass:: pyderasn.Sequence
987    :members: __init__
988
989 Set
990 ___
991 .. autoclass:: pyderasn.Set
992    :members: __init__
993
994 SequenceOf
995 __________
996 .. autoclass:: pyderasn.SequenceOf
997    :members: __init__
998
999 SetOf
1000 _____
1001 .. autoclass:: pyderasn.SetOf
1002    :members: __init__
1003
1004 Various
1005 -------
1006
1007 .. autofunction:: pyderasn.abs_decode_path
1008 .. autofunction:: pyderasn.agg_octet_string
1009 .. autofunction:: pyderasn.colonize_hex
1010 .. autofunction:: pyderasn.encode2pass
1011 .. autofunction:: pyderasn.encode_cer
1012 .. autofunction:: pyderasn.file_mmaped
1013 .. autofunction:: pyderasn.hexenc
1014 .. autofunction:: pyderasn.hexdec
1015 .. autofunction:: pyderasn.tag_encode
1016 .. autofunction:: pyderasn.tag_decode
1017 .. autofunction:: pyderasn.tag_ctxp
1018 .. autofunction:: pyderasn.tag_ctxc
1019 .. autoclass:: pyderasn.DecodeError
1020    :members: __init__
1021 .. autoclass:: pyderasn.NotEnoughData
1022 .. autoclass:: pyderasn.ExceedingData
1023 .. autoclass:: pyderasn.LenIndefForm
1024 .. autoclass:: pyderasn.TagMismatch
1025 .. autoclass:: pyderasn.InvalidLength
1026 .. autoclass:: pyderasn.InvalidOID
1027 .. autoclass:: pyderasn.ObjUnknown
1028 .. autoclass:: pyderasn.ObjNotReady
1029 .. autoclass:: pyderasn.InvalidValueType
1030 .. autoclass:: pyderasn.BoundsError
1031
1032 .. _cmdline:
1033
1034 Command-line usage
1035 ------------------
1036
1037 You can decode DER/BER files using command line abilities::
1038
1039     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1040
1041 If there is no schema for your file, then you can try parsing it without,
1042 but of course IMPLICIT tags will often make it impossible. But result is
1043 good enough for the certificate above::
1044
1045     $ python -m pyderasn path/to/file
1046         0   [1,3,1604]  . >: SEQUENCE OF
1047         4   [1,3,1453]  . . >: SEQUENCE OF
1048         8   [0,0,   5]  . . . . >: [0] ANY
1049                         . . . . . A0:03:02:01:02
1050        13   [1,1,   3]  . . . . >: INTEGER 61595
1051        18   [1,1,  13]  . . . . >: SEQUENCE OF
1052        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1053        31   [1,1,   0]  . . . . . . >: NULL
1054        33   [1,3, 274]  . . . . >: SEQUENCE OF
1055        37   [1,1,  11]  . . . . . . >: SET OF
1056        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1057        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1058        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1059     [...]
1060      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1061      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1062      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1063                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1064                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1065                         . . . . . . . . . 61:2E:63:6F:6D:2F
1066      1461   [1,1,  13]  . . >: SEQUENCE OF
1067      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1068      1474   [1,1,   0]  . . . . >: NULL
1069      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1070                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1071                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1072     [...]
1073
1074 Human readable OIDs
1075 ___________________
1076
1077 If you have got dictionaries with ObjectIdentifiers, like example one
1078 from ``tests/test_crts.py``::
1079
1080     stroid2name = {
1081         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1082         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1083         [...]
1084         "2.5.4.10": "id-at-organizationName",
1085         "2.5.4.11": "id-at-organizationalUnitName",
1086     }
1087
1088 then you can pass it to pretty printer to see human readable OIDs::
1089
1090     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1091     [...]
1092        37   [1,1,  11]  . . . . . . >: SET OF
1093        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1094        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1095        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1096        50   [1,1,  18]  . . . . . . >: SET OF
1097        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1098        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1099        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1100        70   [1,1,  18]  . . . . . . >: SET OF
1101        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1102        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1103        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1104     [...]
1105
1106 Decode paths
1107 ____________
1108
1109 Each decoded element has so-called decode path: sequence of structure
1110 names it is passing during the decode process. Each element has its own
1111 unique path inside the whole ASN.1 tree. You can print it out with
1112 ``--print-decode-path`` option::
1113
1114     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1115        0    [1,3,1604]  Certificate SEQUENCE []
1116        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1117       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1118       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1119       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1120       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1121       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1122                          . . . . 05:00
1123       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1124       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1125       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1126       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1127       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1128       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1129                          . . . . . . . 13:02:45:53
1130       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1131     [...]
1132
1133 Now you can print only the specified tree, for example signature algorithm::
1134
1135     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1136       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1137       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1138       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1139                          . . 05:00
1140 """
1141
1142 from array import array
1143 from codecs import getdecoder
1144 from codecs import getencoder
1145 from collections import namedtuple
1146 from collections import OrderedDict
1147 from copy import copy
1148 from datetime import datetime
1149 from datetime import timedelta
1150 from io import BytesIO
1151 from math import ceil
1152 from mmap import mmap
1153 from mmap import PROT_READ
1154 from operator import attrgetter
1155 from string import ascii_letters
1156 from string import digits
1157 from sys import maxsize as sys_maxsize
1158 from sys import version_info
1159 from unicodedata import category as unicat
1160
1161 from six import add_metaclass
1162 from six import binary_type
1163 from six import byte2int
1164 from six import indexbytes
1165 from six import int2byte
1166 from six import integer_types
1167 from six import iterbytes
1168 from six import iteritems
1169 from six import itervalues
1170 from six import PY2
1171 from six import string_types
1172 from six import text_type
1173 from six import unichr as six_unichr
1174 from six.moves import xrange as six_xrange
1175
1176
1177 try:
1178     from termcolor import colored
1179 except ImportError:  # pragma: no cover
1180     def colored(what, *args, **kwargs):
1181         return what
1182
1183 __version__ = "7.2"
1184
1185 __all__ = (
1186     "agg_octet_string",
1187     "Any",
1188     "BitString",
1189     "BMPString",
1190     "Boolean",
1191     "BoundsError",
1192     "Choice",
1193     "DecodeError",
1194     "DecodePathDefBy",
1195     "encode2pass",
1196     "encode_cer",
1197     "Enumerated",
1198     "ExceedingData",
1199     "file_mmaped",
1200     "GeneralizedTime",
1201     "GeneralString",
1202     "GraphicString",
1203     "hexdec",
1204     "hexenc",
1205     "IA5String",
1206     "Integer",
1207     "InvalidLength",
1208     "InvalidOID",
1209     "InvalidValueType",
1210     "ISO646String",
1211     "LenIndefForm",
1212     "NotEnoughData",
1213     "Null",
1214     "NumericString",
1215     "obj_by_path",
1216     "ObjectIdentifier",
1217     "ObjNotReady",
1218     "ObjUnknown",
1219     "OctetString",
1220     "PrimitiveTypes",
1221     "PrintableString",
1222     "Sequence",
1223     "SequenceOf",
1224     "Set",
1225     "SetOf",
1226     "T61String",
1227     "tag_ctxc",
1228     "tag_ctxp",
1229     "tag_decode",
1230     "TagClassApplication",
1231     "TagClassContext",
1232     "TagClassPrivate",
1233     "TagClassUniversal",
1234     "TagFormConstructed",
1235     "TagFormPrimitive",
1236     "TagMismatch",
1237     "TeletexString",
1238     "UniversalString",
1239     "UTCTime",
1240     "UTF8String",
1241     "VideotexString",
1242     "VisibleString",
1243 )
1244
1245 TagClassUniversal = 0
1246 TagClassApplication = 1 << 6
1247 TagClassContext = 1 << 7
1248 TagClassPrivate = 1 << 6 | 1 << 7
1249 TagFormPrimitive = 0
1250 TagFormConstructed = 1 << 5
1251 TagClassReprs = {
1252     TagClassContext: "",
1253     TagClassApplication: "APPLICATION ",
1254     TagClassPrivate: "PRIVATE ",
1255     TagClassUniversal: "UNIV ",
1256 }
1257 EOC = b"\x00\x00"
1258 EOC_LEN = len(EOC)
1259 LENINDEF = b"\x80"  # length indefinite mark
1260 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1261 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1262 SET01 = frozenset("01")
1263 DECIMALS = frozenset(digits)
1264 DECIMAL_SIGNS = ".,"
1265 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1266
1267
1268 def file_mmaped(fd):
1269     """Make mmap-ed memoryview for reading from file
1270
1271     :param fd: file object
1272     :returns: memoryview over read-only mmap-ing of the whole file
1273     """
1274     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1275
1276 def pureint(value):
1277     if not set(value) <= DECIMALS:
1278         raise ValueError("non-pure integer")
1279     return int(value)
1280
1281 def fractions2float(fractions_raw):
1282     pureint(fractions_raw)
1283     return float("0." + fractions_raw)
1284
1285
1286 def get_def_by_path(defines_by_path, sub_decode_path):
1287     """Get define by decode path
1288     """
1289     for path, define in defines_by_path:
1290         if len(path) != len(sub_decode_path):
1291             continue
1292         for p1, p2 in zip(path, sub_decode_path):
1293             if (not p1 is any) and (p1 != p2):
1294                 break
1295         else:
1296             return define
1297
1298
1299 ########################################################################
1300 # Errors
1301 ########################################################################
1302
1303 class ASN1Error(ValueError):
1304     pass
1305
1306
1307 class DecodeError(ASN1Error):
1308     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1309         """
1310         :param str msg: reason of decode failing
1311         :param klass: optional exact DecodeError inherited class (like
1312                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1313                       :py:exc:`InvalidLength`)
1314         :param decode_path: tuple of strings. It contains human
1315                             readable names of the fields through which
1316                             decoding process has passed
1317         :param int offset: binary offset where failure happened
1318         """
1319         super(DecodeError, self).__init__()
1320         self.msg = msg
1321         self.klass = klass
1322         self.decode_path = decode_path
1323         self.offset = offset
1324
1325     def __str__(self):
1326         return " ".join(
1327             c for c in (
1328                 "" if self.klass is None else self.klass.__name__,
1329                 (
1330                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1331                     if len(self.decode_path) > 0 else ""
1332                 ),
1333                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1334                 self.msg,
1335             ) if c != ""
1336         )
1337
1338     def __repr__(self):
1339         return "%s(%s)" % (self.__class__.__name__, self)
1340
1341
1342 class NotEnoughData(DecodeError):
1343     pass
1344
1345
1346 class ExceedingData(ASN1Error):
1347     def __init__(self, nbytes):
1348         super(ExceedingData, self).__init__()
1349         self.nbytes = nbytes
1350
1351     def __str__(self):
1352         return "%d trailing bytes" % self.nbytes
1353
1354     def __repr__(self):
1355         return "%s(%s)" % (self.__class__.__name__, self)
1356
1357
1358 class LenIndefForm(DecodeError):
1359     pass
1360
1361
1362 class TagMismatch(DecodeError):
1363     pass
1364
1365
1366 class InvalidLength(DecodeError):
1367     pass
1368
1369
1370 class InvalidOID(DecodeError):
1371     pass
1372
1373
1374 class ObjUnknown(ASN1Error):
1375     def __init__(self, name):
1376         super(ObjUnknown, self).__init__()
1377         self.name = name
1378
1379     def __str__(self):
1380         return "object is unknown: %s" % self.name
1381
1382     def __repr__(self):
1383         return "%s(%s)" % (self.__class__.__name__, self)
1384
1385
1386 class ObjNotReady(ASN1Error):
1387     def __init__(self, name):
1388         super(ObjNotReady, self).__init__()
1389         self.name = name
1390
1391     def __str__(self):
1392         return "object is not ready: %s" % self.name
1393
1394     def __repr__(self):
1395         return "%s(%s)" % (self.__class__.__name__, self)
1396
1397
1398 class InvalidValueType(ASN1Error):
1399     def __init__(self, expected_types):
1400         super(InvalidValueType, self).__init__()
1401         self.expected_types = expected_types
1402
1403     def __str__(self):
1404         return "invalid value type, expected: %s" % ", ".join(
1405             [repr(t) for t in self.expected_types]
1406         )
1407
1408     def __repr__(self):
1409         return "%s(%s)" % (self.__class__.__name__, self)
1410
1411
1412 class BoundsError(ASN1Error):
1413     def __init__(self, bound_min, value, bound_max):
1414         super(BoundsError, self).__init__()
1415         self.bound_min = bound_min
1416         self.value = value
1417         self.bound_max = bound_max
1418
1419     def __str__(self):
1420         return "unsatisfied bounds: %s <= %s <= %s" % (
1421             self.bound_min,
1422             self.value,
1423             self.bound_max,
1424         )
1425
1426     def __repr__(self):
1427         return "%s(%s)" % (self.__class__.__name__, self)
1428
1429
1430 ########################################################################
1431 # Basic coders
1432 ########################################################################
1433
1434 _hexdecoder = getdecoder("hex")
1435 _hexencoder = getencoder("hex")
1436
1437
1438 def hexdec(data):
1439     """Binary data to hexadecimal string convert
1440     """
1441     return _hexdecoder(data)[0]
1442
1443
1444 def hexenc(data):
1445     """Hexadecimal string to binary data convert
1446     """
1447     return _hexencoder(data)[0].decode("ascii")
1448
1449
1450 def int_bytes_len(num, byte_len=8):
1451     if num == 0:
1452         return 1
1453     return int(ceil(float(num.bit_length()) / byte_len))
1454
1455
1456 def zero_ended_encode(num):
1457     octets = bytearray(int_bytes_len(num, 7))
1458     i = len(octets) - 1
1459     octets[i] = num & 0x7F
1460     num >>= 7
1461     i -= 1
1462     while num > 0:
1463         octets[i] = 0x80 | (num & 0x7F)
1464         num >>= 7
1465         i -= 1
1466     return bytes(octets)
1467
1468
1469 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1470     """Encode tag to binary form
1471
1472     :param int num: tag's number
1473     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1474                       :py:data:`pyderasn.TagClassContext`,
1475                       :py:data:`pyderasn.TagClassApplication`,
1476                       :py:data:`pyderasn.TagClassPrivate`)
1477     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1478                      :py:data:`pyderasn.TagFormConstructed`)
1479     """
1480     if num < 31:
1481         # [XX|X|.....]
1482         return int2byte(klass | form | num)
1483     # [XX|X|11111][1.......][1.......] ... [0.......]
1484     return int2byte(klass | form | 31) + zero_ended_encode(num)
1485
1486
1487 def tag_decode(tag):
1488     """Decode tag from binary form
1489
1490     .. warning::
1491
1492        No validation is performed, assuming that it has already passed.
1493
1494     It returns tuple with three integers, as
1495     :py:func:`pyderasn.tag_encode` accepts.
1496     """
1497     first_octet = byte2int(tag)
1498     klass = first_octet & 0xC0
1499     form = first_octet & 0x20
1500     if first_octet & 0x1F < 0x1F:
1501         return (klass, form, first_octet & 0x1F)
1502     num = 0
1503     for octet in iterbytes(tag[1:]):
1504         num <<= 7
1505         num |= octet & 0x7F
1506     return (klass, form, num)
1507
1508
1509 def tag_ctxp(num):
1510     """Create CONTEXT PRIMITIVE tag
1511     """
1512     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1513
1514
1515 def tag_ctxc(num):
1516     """Create CONTEXT CONSTRUCTED tag
1517     """
1518     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1519
1520
1521 def tag_strip(data):
1522     """Take off tag from the data
1523
1524     :returns: (encoded tag, tag length, remaining data)
1525     """
1526     if len(data) == 0:
1527         raise NotEnoughData("no data at all")
1528     if byte2int(data) & 0x1F < 31:
1529         return data[:1], 1, data[1:]
1530     i = 0
1531     while True:
1532         i += 1
1533         if i == len(data):
1534             raise DecodeError("unfinished tag")
1535         if indexbytes(data, i) & 0x80 == 0:
1536             break
1537     i += 1
1538     return data[:i], i, data[i:]
1539
1540
1541 def len_encode(l):
1542     if l < 0x80:
1543         return int2byte(l)
1544     octets = bytearray(int_bytes_len(l) + 1)
1545     octets[0] = 0x80 | (len(octets) - 1)
1546     for i in six_xrange(len(octets) - 1, 0, -1):
1547         octets[i] = l & 0xFF
1548         l >>= 8
1549     return bytes(octets)
1550
1551
1552 def len_decode(data):
1553     """Decode length
1554
1555     :returns: (decoded length, length's length, remaining data)
1556     :raises LenIndefForm: if indefinite form encoding is met
1557     """
1558     if len(data) == 0:
1559         raise NotEnoughData("no data at all")
1560     first_octet = byte2int(data)
1561     if first_octet & 0x80 == 0:
1562         return first_octet, 1, data[1:]
1563     octets_num = first_octet & 0x7F
1564     if octets_num + 1 > len(data):
1565         raise NotEnoughData("encoded length is longer than data")
1566     if octets_num == 0:
1567         raise LenIndefForm()
1568     if byte2int(data[1:]) == 0:
1569         raise DecodeError("leading zeros")
1570     l = 0
1571     for v in iterbytes(data[1:1 + octets_num]):
1572         l = (l << 8) | v
1573     if l <= 127:
1574         raise DecodeError("long form instead of short one")
1575     return l, 1 + octets_num, data[1 + octets_num:]
1576
1577
1578 LEN0 = len_encode(0)
1579 LEN1 = len_encode(1)
1580 LEN1K = len_encode(1000)
1581
1582
1583 def len_size(l):
1584     """How many bytes length field will take
1585     """
1586     if l < 128:
1587         return 1
1588     if l < 256:  # 1 << 8
1589         return 2
1590     if l < 65536:  # 1 << 16
1591         return 3
1592     if l < 16777216:  # 1 << 24
1593         return 4
1594     if l < 4294967296:  # 1 << 32
1595         return 5
1596     if l < 1099511627776:  # 1 << 40
1597         return 6
1598     if l < 281474976710656:  # 1 << 48
1599         return 7
1600     if l < 72057594037927936:  # 1 << 56
1601         return 8
1602     raise OverflowError("too big length")
1603
1604
1605 def write_full(writer, data):
1606     """Fully write provided data
1607
1608     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1609
1610     BytesIO does not guarantee that the whole data will be written at
1611     once. That function write everything provided, raising an error if
1612     ``writer`` returns None.
1613     """
1614     data = memoryview(data)
1615     written = 0
1616     while written != len(data):
1617         n = writer(data[written:])
1618         if n is None:
1619             raise ValueError("can not write to buf")
1620         written += n
1621
1622
1623 # If it is 64-bit system, then use compact 64-bit array of unsigned
1624 # longs. Use an ordinary list with universal integers otherwise, that
1625 # is slower.
1626 if sys_maxsize > 2 ** 32:
1627     def state_2pass_new():
1628         return array("L")
1629 else:
1630     def state_2pass_new():
1631         return []
1632
1633
1634 ########################################################################
1635 # Base class
1636 ########################################################################
1637
1638 class AutoAddSlots(type):
1639     def __new__(cls, name, bases, _dict):
1640         _dict["__slots__"] = _dict.get("__slots__", ())
1641         return type.__new__(cls, name, bases, _dict)
1642
1643
1644 BasicState = namedtuple("BasicState", (
1645     "version",
1646     "tag",
1647     "tag_order",
1648     "expl",
1649     "default",
1650     "optional",
1651     "offset",
1652     "llen",
1653     "vlen",
1654     "expl_lenindef",
1655     "lenindef",
1656     "ber_encoded",
1657 ), **NAMEDTUPLE_KWARGS)
1658
1659
1660 @add_metaclass(AutoAddSlots)
1661 class Obj(object):
1662     """Common ASN.1 object class
1663
1664     All ASN.1 types are inherited from it. It has metaclass that
1665     automatically adds ``__slots__`` to all inherited classes.
1666     """
1667     __slots__ = (
1668         "tag",
1669         "_tag_order",
1670         "_value",
1671         "_expl",
1672         "default",
1673         "optional",
1674         "offset",
1675         "llen",
1676         "vlen",
1677         "expl_lenindef",
1678         "lenindef",
1679         "ber_encoded",
1680     )
1681
1682     def __init__(
1683             self,
1684             impl=None,
1685             expl=None,
1686             default=None,
1687             optional=False,
1688             _decoded=(0, 0, 0),
1689     ):
1690         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1691         self._expl = getattr(self, "expl", None) if expl is None else expl
1692         if self.tag != self.tag_default and self._expl is not None:
1693             raise ValueError("implicit and explicit tags can not be set simultaneously")
1694         if self.tag is None:
1695             self._tag_order = None
1696         else:
1697             tag_class, _, tag_num = tag_decode(
1698                 self.tag if self._expl is None else self._expl
1699             )
1700             self._tag_order = (tag_class, tag_num)
1701         if default is not None:
1702             optional = True
1703         self.optional = optional
1704         self.offset, self.llen, self.vlen = _decoded
1705         self.default = None
1706         self.expl_lenindef = False
1707         self.lenindef = False
1708         self.ber_encoded = False
1709
1710     @property
1711     def ready(self):  # pragma: no cover
1712         """Is object ready to be encoded?
1713         """
1714         raise NotImplementedError()
1715
1716     def _assert_ready(self):
1717         if not self.ready:
1718             raise ObjNotReady(self.__class__.__name__)
1719
1720     @property
1721     def bered(self):
1722         """Is either object or any elements inside is BER encoded?
1723         """
1724         return self.expl_lenindef or self.lenindef or self.ber_encoded
1725
1726     @property
1727     def decoded(self):
1728         """Is object decoded?
1729         """
1730         return (self.llen + self.vlen) > 0
1731
1732     def __getstate__(self):  # pragma: no cover
1733         """Used for making safe to be mutable pickleable copies
1734         """
1735         raise NotImplementedError()
1736
1737     def __setstate__(self, state):
1738         if state.version != __version__:
1739             raise ValueError("data is pickled by different PyDERASN version")
1740         self.tag = state.tag
1741         self._tag_order = state.tag_order
1742         self._expl = state.expl
1743         self.default = state.default
1744         self.optional = state.optional
1745         self.offset = state.offset
1746         self.llen = state.llen
1747         self.vlen = state.vlen
1748         self.expl_lenindef = state.expl_lenindef
1749         self.lenindef = state.lenindef
1750         self.ber_encoded = state.ber_encoded
1751
1752     @property
1753     def tag_order(self):
1754         """Tag's (class, number) used for DER/CER sorting
1755         """
1756         return self._tag_order
1757
1758     @property
1759     def tag_order_cer(self):
1760         return self.tag_order
1761
1762     @property
1763     def tlen(self):
1764         """.. seealso:: :ref:`decoding`
1765         """
1766         return len(self.tag)
1767
1768     @property
1769     def tlvlen(self):
1770         """.. seealso:: :ref:`decoding`
1771         """
1772         return self.tlen + self.llen + self.vlen
1773
1774     def __str__(self):  # pragma: no cover
1775         return self.__bytes__() if PY2 else self.__unicode__()
1776
1777     def __ne__(self, their):
1778         return not(self == their)
1779
1780     def __gt__(self, their):  # pragma: no cover
1781         return not(self < their)
1782
1783     def __le__(self, their):  # pragma: no cover
1784         return (self == their) or (self < their)
1785
1786     def __ge__(self, their):  # pragma: no cover
1787         return (self == their) or (self > their)
1788
1789     def _encode(self):  # pragma: no cover
1790         raise NotImplementedError()
1791
1792     def _encode_cer(self, writer):
1793         write_full(writer, self._encode())
1794
1795     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1796         yield NotImplemented
1797
1798     def _encode1st(self, state):
1799         raise NotImplementedError()
1800
1801     def _encode2nd(self, writer, state_iter):
1802         raise NotImplementedError()
1803
1804     def encode(self):
1805         """DER encode the structure
1806
1807         :returns: DER representation
1808         """
1809         raw = self._encode()
1810         if self._expl is None:
1811             return raw
1812         return b"".join((self._expl, len_encode(len(raw)), raw))
1813
1814     def encode1st(self, state=None):
1815         """Do the 1st pass of 2-pass encoding
1816
1817         :rtype: (int, array("L"))
1818         :returns: full length of encoded data and precalculated various
1819                   objects lengths
1820         """
1821         if state is None:
1822             state = state_2pass_new()
1823         if self._expl is None:
1824             return self._encode1st(state)
1825         state.append(0)
1826         idx = len(state) - 1
1827         vlen, _ = self._encode1st(state)
1828         state[idx] = vlen
1829         fulllen = len(self._expl) + len_size(vlen) + vlen
1830         return fulllen, state
1831
1832     def encode2nd(self, writer, state_iter):
1833         """Do the 2nd pass of 2-pass encoding
1834
1835         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1836         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1837         """
1838         if self._expl is None:
1839             self._encode2nd(writer, state_iter)
1840         else:
1841             write_full(writer, self._expl + len_encode(next(state_iter)))
1842             self._encode2nd(writer, state_iter)
1843
1844     def encode_cer(self, writer):
1845         """CER encode the structure to specified writer
1846
1847         :param writer: must comply with ``io.RawIOBase.write``
1848                        behaviour. It takes slice to be written and
1849                        returns number of bytes processed. If it returns
1850                        None, then exception will be raised
1851         """
1852         if self._expl is not None:
1853             write_full(writer, self._expl + LENINDEF)
1854         if getattr(self, "der_forced", False):
1855             write_full(writer, self._encode())
1856         else:
1857             self._encode_cer(writer)
1858         if self._expl is not None:
1859             write_full(writer, EOC)
1860
1861     def hexencode(self):
1862         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1863         """
1864         return hexenc(self.encode())
1865
1866     def decode(
1867             self,
1868             data,
1869             offset=0,
1870             leavemm=False,
1871             decode_path=(),
1872             ctx=None,
1873             tag_only=False,
1874             _ctx_immutable=True,
1875     ):
1876         """Decode the data
1877
1878         :param data: either binary or memoryview
1879         :param int offset: initial data's offset
1880         :param bool leavemm: do we need to leave memoryview of remaining
1881                     data as is, or convert it to bytes otherwise
1882         :param decode_path: current decode path (tuples of strings,
1883                             possibly with DecodePathDefBy) with will be
1884                             the root for all underlying objects
1885         :param ctx: optional :ref:`context <ctx>` governing decoding process
1886         :param bool tag_only: decode only the tag, without length and
1887                               contents (used only in Choice and Set
1888                               structures, trying to determine if tag satisfies
1889                               the schema)
1890         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1891                                     before using it?
1892         :returns: (Obj, remaining data)
1893
1894         .. seealso:: :ref:`decoding`
1895         """
1896         result = next(self.decode_evgen(
1897             data,
1898             offset,
1899             leavemm,
1900             decode_path,
1901             ctx,
1902             tag_only,
1903             _ctx_immutable,
1904             _evgen_mode=False,
1905         ))
1906         if result is None:
1907             return None
1908         _, obj, tail = result
1909         return obj, tail
1910
1911     def decode_evgen(
1912             self,
1913             data,
1914             offset=0,
1915             leavemm=False,
1916             decode_path=(),
1917             ctx=None,
1918             tag_only=False,
1919             _ctx_immutable=True,
1920             _evgen_mode=True,
1921     ):
1922         """Decode with evgen mode on
1923
1924         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1925         it returns the generator producing ``(decode_path, obj, tail)``
1926         values.
1927         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1928         """
1929         if ctx is None:
1930             ctx = {}
1931         elif _ctx_immutable:
1932             ctx = copy(ctx)
1933         tlv = memoryview(data)
1934         if (
1935                 _evgen_mode and
1936                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1937         ):
1938             _evgen_mode = False
1939         if self._expl is None:
1940             for result in self._decode(
1941                     tlv,
1942                     offset=offset,
1943                     decode_path=decode_path,
1944                     ctx=ctx,
1945                     tag_only=tag_only,
1946                     evgen_mode=_evgen_mode,
1947             ):
1948                 if tag_only:
1949                     yield None
1950                     return
1951                 _decode_path, obj, tail = result
1952                 if not _decode_path is decode_path:
1953                     yield result
1954         else:
1955             try:
1956                 t, tlen, lv = tag_strip(tlv)
1957             except DecodeError as err:
1958                 raise err.__class__(
1959                     msg=err.msg,
1960                     klass=self.__class__,
1961                     decode_path=decode_path,
1962                     offset=offset,
1963                 )
1964             if t != self._expl:
1965                 raise TagMismatch(
1966                     klass=self.__class__,
1967                     decode_path=decode_path,
1968                     offset=offset,
1969                 )
1970             try:
1971                 l, llen, v = len_decode(lv)
1972             except LenIndefForm as err:
1973                 if not ctx.get("bered", False):
1974                     raise err.__class__(
1975                         msg=err.msg,
1976                         klass=self.__class__,
1977                         decode_path=decode_path,
1978                         offset=offset,
1979                     )
1980                 llen, v = 1, lv[1:]
1981                 offset += tlen + llen
1982                 for result in self._decode(
1983                         v,
1984                         offset=offset,
1985                         decode_path=decode_path,
1986                         ctx=ctx,
1987                         tag_only=tag_only,
1988                         evgen_mode=_evgen_mode,
1989                 ):
1990                     if tag_only:  # pragma: no cover
1991                         yield None
1992                         return
1993                     _decode_path, obj, tail = result
1994                     if not _decode_path is decode_path:
1995                         yield result
1996                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1997                 if eoc_expected.tobytes() != EOC:
1998                     raise DecodeError(
1999                         "no EOC",
2000                         klass=self.__class__,
2001                         decode_path=decode_path,
2002                         offset=offset,
2003                     )
2004                 obj.vlen += EOC_LEN
2005                 obj.expl_lenindef = True
2006             except DecodeError as err:
2007                 raise err.__class__(
2008                     msg=err.msg,
2009                     klass=self.__class__,
2010                     decode_path=decode_path,
2011                     offset=offset,
2012                 )
2013             else:
2014                 if l > len(v):
2015                     raise NotEnoughData(
2016                         "encoded length is longer than data",
2017                         klass=self.__class__,
2018                         decode_path=decode_path,
2019                         offset=offset,
2020                     )
2021                 for result in self._decode(
2022                         v,
2023                         offset=offset + tlen + llen,
2024                         decode_path=decode_path,
2025                         ctx=ctx,
2026                         tag_only=tag_only,
2027                         evgen_mode=_evgen_mode,
2028                 ):
2029                     if tag_only:  # pragma: no cover
2030                         yield None
2031                         return
2032                     _decode_path, obj, tail = result
2033                     if not _decode_path is decode_path:
2034                         yield result
2035                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2036                     raise DecodeError(
2037                         "explicit tag out-of-bound, longer than data",
2038                         klass=self.__class__,
2039                         decode_path=decode_path,
2040                         offset=offset,
2041                     )
2042         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2043
2044     def decod(self, data, offset=0, decode_path=(), ctx=None):
2045         """Decode the data, check that tail is empty
2046
2047         :raises ExceedingData: if tail is not empty
2048
2049         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2050         (decode without tail) that also checks that there is no
2051         trailing data left.
2052         """
2053         obj, tail = self.decode(
2054             data,
2055             offset=offset,
2056             decode_path=decode_path,
2057             ctx=ctx,
2058             leavemm=True,
2059         )
2060         if len(tail) > 0:
2061             raise ExceedingData(len(tail))
2062         return obj
2063
2064     def hexdecode(self, data, *args, **kwargs):
2065         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2066         """
2067         return self.decode(hexdec(data), *args, **kwargs)
2068
2069     def hexdecod(self, data, *args, **kwargs):
2070         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2071         """
2072         return self.decod(hexdec(data), *args, **kwargs)
2073
2074     @property
2075     def expled(self):
2076         """.. seealso:: :ref:`decoding`
2077         """
2078         return self._expl is not None
2079
2080     @property
2081     def expl_tag(self):
2082         """.. seealso:: :ref:`decoding`
2083         """
2084         return self._expl
2085
2086     @property
2087     def expl_tlen(self):
2088         """.. seealso:: :ref:`decoding`
2089         """
2090         return len(self._expl)
2091
2092     @property
2093     def expl_llen(self):
2094         """.. seealso:: :ref:`decoding`
2095         """
2096         if self.expl_lenindef:
2097             return 1
2098         return len(len_encode(self.tlvlen))
2099
2100     @property
2101     def expl_offset(self):
2102         """.. seealso:: :ref:`decoding`
2103         """
2104         return self.offset - self.expl_tlen - self.expl_llen
2105
2106     @property
2107     def expl_vlen(self):
2108         """.. seealso:: :ref:`decoding`
2109         """
2110         return self.tlvlen
2111
2112     @property
2113     def expl_tlvlen(self):
2114         """.. seealso:: :ref:`decoding`
2115         """
2116         return self.expl_tlen + self.expl_llen + self.expl_vlen
2117
2118     @property
2119     def fulloffset(self):
2120         """.. seealso:: :ref:`decoding`
2121         """
2122         return self.expl_offset if self.expled else self.offset
2123
2124     @property
2125     def fulllen(self):
2126         """.. seealso:: :ref:`decoding`
2127         """
2128         return self.expl_tlvlen if self.expled else self.tlvlen
2129
2130     def pps_lenindef(self, decode_path):
2131         if self.lenindef and not (
2132                 getattr(self, "defined", None) is not None and
2133                 self.defined[1].lenindef
2134         ):
2135             yield _pp(
2136                 asn1_type_name="EOC",
2137                 obj_name="",
2138                 decode_path=decode_path,
2139                 offset=(
2140                     self.offset + self.tlvlen -
2141                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2142                 ),
2143                 tlen=1,
2144                 llen=1,
2145                 vlen=0,
2146                 ber_encoded=True,
2147                 bered=True,
2148             )
2149         if self.expl_lenindef:
2150             yield _pp(
2151                 asn1_type_name="EOC",
2152                 obj_name="EXPLICIT",
2153                 decode_path=decode_path,
2154                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2155                 tlen=1,
2156                 llen=1,
2157                 vlen=0,
2158                 ber_encoded=True,
2159                 bered=True,
2160             )
2161
2162
2163 def encode_cer(obj):
2164     """Encode to CER in memory buffer
2165
2166     :returns bytes: memory buffer contents
2167     """
2168     buf = BytesIO()
2169     obj.encode_cer(buf.write)
2170     return buf.getvalue()
2171
2172
2173 def encode2pass(obj):
2174     """Encode (2-pass mode) to DER in memory buffer
2175
2176     :returns bytes: memory buffer contents
2177     """
2178     buf = BytesIO()
2179     _, state = obj.encode1st()
2180     obj.encode2nd(buf.write, iter(state))
2181     return buf.getvalue()
2182
2183
2184 class DecodePathDefBy(object):
2185     """DEFINED BY representation inside decode path
2186     """
2187     __slots__ = ("defined_by",)
2188
2189     def __init__(self, defined_by):
2190         self.defined_by = defined_by
2191
2192     def __ne__(self, their):
2193         return not(self == their)
2194
2195     def __eq__(self, their):
2196         if not isinstance(their, self.__class__):
2197             return False
2198         return self.defined_by == their.defined_by
2199
2200     def __str__(self):
2201         return "DEFINED BY " + str(self.defined_by)
2202
2203     def __repr__(self):
2204         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2205
2206
2207 ########################################################################
2208 # Pretty printing
2209 ########################################################################
2210
2211 PP = namedtuple("PP", (
2212     "obj",
2213     "asn1_type_name",
2214     "obj_name",
2215     "decode_path",
2216     "value",
2217     "blob",
2218     "optional",
2219     "default",
2220     "impl",
2221     "expl",
2222     "offset",
2223     "tlen",
2224     "llen",
2225     "vlen",
2226     "expl_offset",
2227     "expl_tlen",
2228     "expl_llen",
2229     "expl_vlen",
2230     "expl_lenindef",
2231     "lenindef",
2232     "ber_encoded",
2233     "bered",
2234 ), **NAMEDTUPLE_KWARGS)
2235
2236
2237 def _pp(
2238         obj=None,
2239         asn1_type_name="unknown",
2240         obj_name="unknown",
2241         decode_path=(),
2242         value=None,
2243         blob=None,
2244         optional=False,
2245         default=False,
2246         impl=None,
2247         expl=None,
2248         offset=0,
2249         tlen=0,
2250         llen=0,
2251         vlen=0,
2252         expl_offset=None,
2253         expl_tlen=None,
2254         expl_llen=None,
2255         expl_vlen=None,
2256         expl_lenindef=False,
2257         lenindef=False,
2258         ber_encoded=False,
2259         bered=False,
2260 ):
2261     return PP(
2262         obj,
2263         asn1_type_name,
2264         obj_name,
2265         decode_path,
2266         value,
2267         blob,
2268         optional,
2269         default,
2270         impl,
2271         expl,
2272         offset,
2273         tlen,
2274         llen,
2275         vlen,
2276         expl_offset,
2277         expl_tlen,
2278         expl_llen,
2279         expl_vlen,
2280         expl_lenindef,
2281         lenindef,
2282         ber_encoded,
2283         bered,
2284     )
2285
2286
2287 def _colourize(what, colour, with_colours, attrs=("bold",)):
2288     return colored(what, colour, attrs=attrs) if with_colours else what
2289
2290
2291 def colonize_hex(hexed):
2292     """Separate hexadecimal string with colons
2293     """
2294     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2295
2296
2297 def pp_console_row(
2298         pp,
2299         oid_maps=(),
2300         with_offsets=False,
2301         with_blob=True,
2302         with_colours=False,
2303         with_decode_path=False,
2304         decode_path_len_decrease=0,
2305 ):
2306     cols = []
2307     if with_offsets:
2308         col = "%5d%s%s" % (
2309             pp.offset,
2310             (
2311                 "  " if pp.expl_offset is None else
2312                 ("-%d" % (pp.offset - pp.expl_offset))
2313             ),
2314             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2315         )
2316         col = _colourize(col, "red", with_colours, ())
2317         col += _colourize("B", "red", with_colours) if pp.bered else " "
2318         cols.append(col)
2319         col = "[%d,%d,%4d]%s" % (
2320             pp.tlen,
2321             pp.llen,
2322             pp.vlen,
2323             LENINDEF_PP_CHAR if pp.lenindef else " "
2324         )
2325         col = _colourize(col, "green", with_colours, ())
2326         cols.append(col)
2327     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2328     if decode_path_len > 0:
2329         cols.append(" ." * decode_path_len)
2330         ent = pp.decode_path[-1]
2331         if isinstance(ent, DecodePathDefBy):
2332             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2333             value = str(ent.defined_by)
2334             oid_name = None
2335             if (
2336                     len(oid_maps) > 0 and
2337                     ent.defined_by.asn1_type_name ==
2338                     ObjectIdentifier.asn1_type_name
2339             ):
2340                 for oid_map in oid_maps:
2341                     oid_name = oid_map.get(value)
2342                     if oid_name is not None:
2343                         cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2344                         break
2345             if oid_name is None:
2346                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2347         else:
2348             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2349     if pp.expl is not None:
2350         klass, _, num = pp.expl
2351         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2352         cols.append(_colourize(col, "blue", with_colours))
2353     if pp.impl is not None:
2354         klass, _, num = pp.impl
2355         col = "[%s%d]" % (TagClassReprs[klass], num)
2356         cols.append(_colourize(col, "blue", with_colours))
2357     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2358         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2359     if pp.ber_encoded:
2360         cols.append(_colourize("BER", "red", with_colours))
2361     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2362     if pp.value is not None:
2363         value = pp.value
2364         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2365         if (
2366                 len(oid_maps) > 0 and
2367                 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2368         ):
2369             for oid_map in oid_maps:
2370                 oid_name = oid_map.get(value)
2371                 if oid_name is not None:
2372                     cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2373                     break
2374         if pp.asn1_type_name == Integer.asn1_type_name:
2375             hex_repr = hex(int(pp.obj._value))[2:].upper()
2376             if len(hex_repr) % 2 != 0:
2377                 hex_repr = "0" + hex_repr
2378             cols.append(_colourize(
2379                 "(%s)" % colonize_hex(hex_repr),
2380                 "green",
2381                 with_colours,
2382             ))
2383     if with_blob:
2384         if pp.blob.__class__ == binary_type:
2385             cols.append(hexenc(pp.blob))
2386         elif pp.blob.__class__ == tuple:
2387             cols.append(", ".join(pp.blob))
2388     if pp.optional:
2389         cols.append(_colourize("OPTIONAL", "red", with_colours))
2390     if pp.default:
2391         cols.append(_colourize("DEFAULT", "red", with_colours))
2392     if with_decode_path:
2393         cols.append(_colourize(
2394             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2395             "grey",
2396             with_colours,
2397         ))
2398     return " ".join(cols)
2399
2400
2401 def pp_console_blob(pp, decode_path_len_decrease=0):
2402     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2403     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2404     if decode_path_len > 0:
2405         cols.append(" ." * (decode_path_len + 1))
2406     if pp.blob.__class__ == binary_type:
2407         blob = hexenc(pp.blob).upper()
2408         for i in six_xrange(0, len(blob), 32):
2409             chunk = blob[i:i + 32]
2410             yield " ".join(cols + [colonize_hex(chunk)])
2411     elif pp.blob.__class__ == tuple:
2412         yield " ".join(cols + [", ".join(pp.blob)])
2413
2414
2415 def pprint(
2416         obj,
2417         oid_maps=(),
2418         big_blobs=False,
2419         with_colours=False,
2420         with_decode_path=False,
2421         decode_path_only=(),
2422         decode_path=(),
2423 ):
2424     """Pretty print object
2425
2426     :param Obj obj: object you want to pretty print
2427     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2428                      Its human readable form is printed when OID is met
2429     :param big_blobs: if large binary objects are met (like OctetString
2430                       values), do we need to print them too, on separate
2431                       lines
2432     :param with_colours: colourize output, if ``termcolor`` library
2433                          is available
2434     :param with_decode_path: print decode path
2435     :param decode_path_only: print only that specified decode path
2436     """
2437     def _pprint_pps(pps):
2438         for pp in pps:
2439             if hasattr(pp, "_fields"):
2440                 if (
2441                         decode_path_only != () and
2442                         tuple(
2443                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2444                         ) != decode_path_only
2445                 ):
2446                     continue
2447                 if big_blobs:
2448                     yield pp_console_row(
2449                         pp,
2450                         oid_maps=oid_maps,
2451                         with_offsets=True,
2452                         with_blob=False,
2453                         with_colours=with_colours,
2454                         with_decode_path=with_decode_path,
2455                         decode_path_len_decrease=len(decode_path_only),
2456                     )
2457                     for row in pp_console_blob(
2458                             pp,
2459                             decode_path_len_decrease=len(decode_path_only),
2460                     ):
2461                         yield row
2462                 else:
2463                     yield pp_console_row(
2464                         pp,
2465                         oid_maps=oid_maps,
2466                         with_offsets=True,
2467                         with_blob=True,
2468                         with_colours=with_colours,
2469                         with_decode_path=with_decode_path,
2470                         decode_path_len_decrease=len(decode_path_only),
2471                     )
2472             else:
2473                 for row in _pprint_pps(pp):
2474                     yield row
2475     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2476
2477
2478 ########################################################################
2479 # ASN.1 primitive types
2480 ########################################################################
2481
2482 BooleanState = namedtuple(
2483     "BooleanState",
2484     BasicState._fields + ("value",),
2485     **NAMEDTUPLE_KWARGS
2486 )
2487
2488
2489 class Boolean(Obj):
2490     """``BOOLEAN`` boolean type
2491
2492     >>> b = Boolean(True)
2493     BOOLEAN True
2494     >>> b == Boolean(True)
2495     True
2496     >>> bool(b)
2497     True
2498     """
2499     __slots__ = ()
2500     tag_default = tag_encode(1)
2501     asn1_type_name = "BOOLEAN"
2502
2503     def __init__(
2504             self,
2505             value=None,
2506             impl=None,
2507             expl=None,
2508             default=None,
2509             optional=False,
2510             _decoded=(0, 0, 0),
2511     ):
2512         """
2513         :param value: set the value. Either boolean type, or
2514                       :py:class:`pyderasn.Boolean` object
2515         :param bytes impl: override default tag with ``IMPLICIT`` one
2516         :param bytes expl: override default tag with ``EXPLICIT`` one
2517         :param default: set default value. Type same as in ``value``
2518         :param bool optional: is object ``OPTIONAL`` in sequence
2519         """
2520         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2521         self._value = None if value is None else self._value_sanitize(value)
2522         if default is not None:
2523             default = self._value_sanitize(default)
2524             self.default = self.__class__(
2525                 value=default,
2526                 impl=self.tag,
2527                 expl=self._expl,
2528             )
2529             if value is None:
2530                 self._value = default
2531
2532     def _value_sanitize(self, value):
2533         if value.__class__ == bool:
2534             return value
2535         if issubclass(value.__class__, Boolean):
2536             return value._value
2537         raise InvalidValueType((self.__class__, bool))
2538
2539     @property
2540     def ready(self):
2541         return self._value is not None
2542
2543     def __getstate__(self):
2544         return BooleanState(
2545             __version__,
2546             self.tag,
2547             self._tag_order,
2548             self._expl,
2549             self.default,
2550             self.optional,
2551             self.offset,
2552             self.llen,
2553             self.vlen,
2554             self.expl_lenindef,
2555             self.lenindef,
2556             self.ber_encoded,
2557             self._value,
2558         )
2559
2560     def __setstate__(self, state):
2561         super(Boolean, self).__setstate__(state)
2562         self._value = state.value
2563
2564     def __nonzero__(self):
2565         self._assert_ready()
2566         return self._value
2567
2568     def __bool__(self):
2569         self._assert_ready()
2570         return self._value
2571
2572     def __eq__(self, their):
2573         if their.__class__ == bool:
2574             return self._value == their
2575         if not issubclass(their.__class__, Boolean):
2576             return False
2577         return (
2578             self._value == their._value and
2579             self.tag == their.tag and
2580             self._expl == their._expl
2581         )
2582
2583     def __call__(
2584             self,
2585             value=None,
2586             impl=None,
2587             expl=None,
2588             default=None,
2589             optional=None,
2590     ):
2591         return self.__class__(
2592             value=value,
2593             impl=self.tag if impl is None else impl,
2594             expl=self._expl if expl is None else expl,
2595             default=self.default if default is None else default,
2596             optional=self.optional if optional is None else optional,
2597         )
2598
2599     def _encode(self):
2600         self._assert_ready()
2601         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2602
2603     def _encode1st(self, state):
2604         return len(self.tag) + 2, state
2605
2606     def _encode2nd(self, writer, state_iter):
2607         self._assert_ready()
2608         write_full(writer, self._encode())
2609
2610     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2611         try:
2612             t, _, lv = tag_strip(tlv)
2613         except DecodeError as err:
2614             raise err.__class__(
2615                 msg=err.msg,
2616                 klass=self.__class__,
2617                 decode_path=decode_path,
2618                 offset=offset,
2619             )
2620         if t != self.tag:
2621             raise TagMismatch(
2622                 klass=self.__class__,
2623                 decode_path=decode_path,
2624                 offset=offset,
2625             )
2626         if tag_only:
2627             yield None
2628             return
2629         try:
2630             l, _, v = len_decode(lv)
2631         except DecodeError as err:
2632             raise err.__class__(
2633                 msg=err.msg,
2634                 klass=self.__class__,
2635                 decode_path=decode_path,
2636                 offset=offset,
2637             )
2638         if l != 1:
2639             raise InvalidLength(
2640                 "Boolean's length must be equal to 1",
2641                 klass=self.__class__,
2642                 decode_path=decode_path,
2643                 offset=offset,
2644             )
2645         if l > len(v):
2646             raise NotEnoughData(
2647                 "encoded length is longer than data",
2648                 klass=self.__class__,
2649                 decode_path=decode_path,
2650                 offset=offset,
2651             )
2652         first_octet = byte2int(v)
2653         ber_encoded = False
2654         if first_octet == 0:
2655             value = False
2656         elif first_octet == 0xFF:
2657             value = True
2658         elif ctx.get("bered", False):
2659             value = True
2660             ber_encoded = True
2661         else:
2662             raise DecodeError(
2663                 "unacceptable Boolean value",
2664                 klass=self.__class__,
2665                 decode_path=decode_path,
2666                 offset=offset,
2667             )
2668         obj = self.__class__(
2669             value=value,
2670             impl=self.tag,
2671             expl=self._expl,
2672             default=self.default,
2673             optional=self.optional,
2674             _decoded=(offset, 1, 1),
2675         )
2676         obj.ber_encoded = ber_encoded
2677         yield decode_path, obj, v[1:]
2678
2679     def __repr__(self):
2680         return pp_console_row(next(self.pps()))
2681
2682     def pps(self, decode_path=()):
2683         yield _pp(
2684             obj=self,
2685             asn1_type_name=self.asn1_type_name,
2686             obj_name=self.__class__.__name__,
2687             decode_path=decode_path,
2688             value=str(self._value) if self.ready else None,
2689             optional=self.optional,
2690             default=self == self.default,
2691             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2692             expl=None if self._expl is None else tag_decode(self._expl),
2693             offset=self.offset,
2694             tlen=self.tlen,
2695             llen=self.llen,
2696             vlen=self.vlen,
2697             expl_offset=self.expl_offset if self.expled else None,
2698             expl_tlen=self.expl_tlen if self.expled else None,
2699             expl_llen=self.expl_llen if self.expled else None,
2700             expl_vlen=self.expl_vlen if self.expled else None,
2701             expl_lenindef=self.expl_lenindef,
2702             ber_encoded=self.ber_encoded,
2703             bered=self.bered,
2704         )
2705         for pp in self.pps_lenindef(decode_path):
2706             yield pp
2707
2708
2709 IntegerState = namedtuple(
2710     "IntegerState",
2711     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2712     **NAMEDTUPLE_KWARGS
2713 )
2714
2715
2716 class Integer(Obj):
2717     """``INTEGER`` integer type
2718
2719     >>> b = Integer(-123)
2720     INTEGER -123
2721     >>> b == Integer(-123)
2722     True
2723     >>> int(b)
2724     -123
2725
2726     >>> Integer(2, bounds=(1, 3))
2727     INTEGER 2
2728     >>> Integer(5, bounds=(1, 3))
2729     Traceback (most recent call last):
2730     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2731
2732     ::
2733
2734         class Version(Integer):
2735             schema = (
2736                 ("v1", 0),
2737                 ("v2", 1),
2738                 ("v3", 2),
2739             )
2740
2741     >>> v = Version("v1")
2742     Version INTEGER v1
2743     >>> int(v)
2744     0
2745     >>> v.named
2746     'v1'
2747     >>> v.specs
2748     {'v3': 2, 'v1': 0, 'v2': 1}
2749     """
2750     __slots__ = ("specs", "_bound_min", "_bound_max")
2751     tag_default = tag_encode(2)
2752     asn1_type_name = "INTEGER"
2753
2754     def __init__(
2755             self,
2756             value=None,
2757             bounds=None,
2758             impl=None,
2759             expl=None,
2760             default=None,
2761             optional=False,
2762             _specs=None,
2763             _decoded=(0, 0, 0),
2764     ):
2765         """
2766         :param value: set the value. Either integer type, named value
2767                       (if ``schema`` is specified in the class), or
2768                       :py:class:`pyderasn.Integer` object
2769         :param bounds: set ``(MIN, MAX)`` value constraint.
2770                        (-inf, +inf) by default
2771         :param bytes impl: override default tag with ``IMPLICIT`` one
2772         :param bytes expl: override default tag with ``EXPLICIT`` one
2773         :param default: set default value. Type same as in ``value``
2774         :param bool optional: is object ``OPTIONAL`` in sequence
2775         """
2776         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2777         self._value = value
2778         specs = getattr(self, "schema", {}) if _specs is None else _specs
2779         self.specs = specs if specs.__class__ == dict else dict(specs)
2780         self._bound_min, self._bound_max = getattr(
2781             self,
2782             "bounds",
2783             (float("-inf"), float("+inf")),
2784         ) if bounds is None else bounds
2785         if value is not None:
2786             self._value = self._value_sanitize(value)
2787         if default is not None:
2788             default = self._value_sanitize(default)
2789             self.default = self.__class__(
2790                 value=default,
2791                 impl=self.tag,
2792                 expl=self._expl,
2793                 _specs=self.specs,
2794             )
2795             if self._value is None:
2796                 self._value = default
2797
2798     def _value_sanitize(self, value):
2799         if isinstance(value, integer_types):
2800             pass
2801         elif issubclass(value.__class__, Integer):
2802             value = value._value
2803         elif value.__class__ == str:
2804             value = self.specs.get(value)
2805             if value is None:
2806                 raise ObjUnknown("integer value: %s" % value)
2807         else:
2808             raise InvalidValueType((self.__class__, int, str))
2809         if not self._bound_min <= value <= self._bound_max:
2810             raise BoundsError(self._bound_min, value, self._bound_max)
2811         return value
2812
2813     @property
2814     def ready(self):
2815         return self._value is not None
2816
2817     def __getstate__(self):
2818         return IntegerState(
2819             __version__,
2820             self.tag,
2821             self._tag_order,
2822             self._expl,
2823             self.default,
2824             self.optional,
2825             self.offset,
2826             self.llen,
2827             self.vlen,
2828             self.expl_lenindef,
2829             self.lenindef,
2830             self.ber_encoded,
2831             self.specs,
2832             self._value,
2833             self._bound_min,
2834             self._bound_max,
2835         )
2836
2837     def __setstate__(self, state):
2838         super(Integer, self).__setstate__(state)
2839         self.specs = state.specs
2840         self._value = state.value
2841         self._bound_min = state.bound_min
2842         self._bound_max = state.bound_max
2843
2844     def __int__(self):
2845         self._assert_ready()
2846         return int(self._value)
2847
2848     def __hash__(self):
2849         self._assert_ready()
2850         return hash(b"".join((
2851             self.tag,
2852             bytes(self._expl or b""),
2853             str(self._value).encode("ascii"),
2854         )))
2855
2856     def __eq__(self, their):
2857         if isinstance(their, integer_types):
2858             return self._value == their
2859         if not issubclass(their.__class__, Integer):
2860             return False
2861         return (
2862             self._value == their._value and
2863             self.tag == their.tag and
2864             self._expl == their._expl
2865         )
2866
2867     def __lt__(self, their):
2868         return self._value < their._value
2869
2870     @property
2871     def named(self):
2872         """Return named representation (if exists) of the value
2873         """
2874         for name, value in iteritems(self.specs):
2875             if value == self._value:
2876                 return name
2877         return None
2878
2879     def __call__(
2880             self,
2881             value=None,
2882             bounds=None,
2883             impl=None,
2884             expl=None,
2885             default=None,
2886             optional=None,
2887     ):
2888         return self.__class__(
2889             value=value,
2890             bounds=(
2891                 (self._bound_min, self._bound_max)
2892                 if bounds is None else bounds
2893             ),
2894             impl=self.tag if impl is None else impl,
2895             expl=self._expl if expl is None else expl,
2896             default=self.default if default is None else default,
2897             optional=self.optional if optional is None else optional,
2898             _specs=self.specs,
2899         )
2900
2901     def _encode_payload(self):
2902         self._assert_ready()
2903         value = self._value
2904         if PY2:
2905             if value == 0:
2906                 octets = bytearray([0])
2907             elif value < 0:
2908                 value = -value
2909                 value -= 1
2910                 octets = bytearray()
2911                 while value > 0:
2912                     octets.append((value & 0xFF) ^ 0xFF)
2913                     value >>= 8
2914                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2915                     octets.append(0xFF)
2916             else:
2917                 octets = bytearray()
2918                 while value > 0:
2919                     octets.append(value & 0xFF)
2920                     value >>= 8
2921                 if octets[-1] & 0x80 > 0:
2922                     octets.append(0x00)
2923             octets.reverse()
2924             octets = bytes(octets)
2925         else:
2926             bytes_len = ceil(value.bit_length() / 8) or 1
2927             while True:
2928                 try:
2929                     octets = value.to_bytes(
2930                         bytes_len,
2931                         byteorder="big",
2932                         signed=True,
2933                     )
2934                 except OverflowError:
2935                     bytes_len += 1
2936                 else:
2937                     break
2938         return octets
2939         return b"".join((self.tag, len_encode(len(octets)), octets))
2940
2941     def _encode(self):
2942         octets = self._encode_payload()
2943         return b"".join((self.tag, len_encode(len(octets)), octets))
2944
2945     def _encode1st(self, state):
2946         l = len(self._encode_payload())
2947         return len(self.tag) + len_size(l) + l, state
2948
2949     def _encode2nd(self, writer, state_iter):
2950         write_full(writer, self._encode())
2951
2952     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2953         try:
2954             t, _, lv = tag_strip(tlv)
2955         except DecodeError as err:
2956             raise err.__class__(
2957                 msg=err.msg,
2958                 klass=self.__class__,
2959                 decode_path=decode_path,
2960                 offset=offset,
2961             )
2962         if t != self.tag:
2963             raise TagMismatch(
2964                 klass=self.__class__,
2965                 decode_path=decode_path,
2966                 offset=offset,
2967             )
2968         if tag_only:
2969             yield None
2970             return
2971         try:
2972             l, llen, v = len_decode(lv)
2973         except DecodeError as err:
2974             raise err.__class__(
2975                 msg=err.msg,
2976                 klass=self.__class__,
2977                 decode_path=decode_path,
2978                 offset=offset,
2979             )
2980         if l > len(v):
2981             raise NotEnoughData(
2982                 "encoded length is longer than data",
2983                 klass=self.__class__,
2984                 decode_path=decode_path,
2985                 offset=offset,
2986             )
2987         if l == 0:
2988             raise NotEnoughData(
2989                 "zero length",
2990                 klass=self.__class__,
2991                 decode_path=decode_path,
2992                 offset=offset,
2993             )
2994         v, tail = v[:l], v[l:]
2995         first_octet = byte2int(v)
2996         if l > 1:
2997             second_octet = byte2int(v[1:])
2998             if (
2999                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3000                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3001             ):
3002                 raise DecodeError(
3003                     "non normalized integer",
3004                     klass=self.__class__,
3005                     decode_path=decode_path,
3006                     offset=offset,
3007                 )
3008         if PY2:
3009             value = 0
3010             if first_octet & 0x80 > 0:
3011                 octets = bytearray()
3012                 for octet in bytearray(v):
3013                     octets.append(octet ^ 0xFF)
3014                 for octet in octets:
3015                     value = (value << 8) | octet
3016                 value += 1
3017                 value = -value
3018             else:
3019                 for octet in bytearray(v):
3020                     value = (value << 8) | octet
3021         else:
3022             value = int.from_bytes(v, byteorder="big", signed=True)
3023         try:
3024             obj = self.__class__(
3025                 value=value,
3026                 bounds=(self._bound_min, self._bound_max),
3027                 impl=self.tag,
3028                 expl=self._expl,
3029                 default=self.default,
3030                 optional=self.optional,
3031                 _specs=self.specs,
3032                 _decoded=(offset, llen, l),
3033             )
3034         except BoundsError as err:
3035             raise DecodeError(
3036                 msg=str(err),
3037                 klass=self.__class__,
3038                 decode_path=decode_path,
3039                 offset=offset,
3040             )
3041         yield decode_path, obj, tail
3042
3043     def __repr__(self):
3044         return pp_console_row(next(self.pps()))
3045
3046     def pps(self, decode_path=()):
3047         yield _pp(
3048             obj=self,
3049             asn1_type_name=self.asn1_type_name,
3050             obj_name=self.__class__.__name__,
3051             decode_path=decode_path,
3052             value=(self.named or str(self._value)) if self.ready else None,
3053             optional=self.optional,
3054             default=self == self.default,
3055             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3056             expl=None if self._expl is None else tag_decode(self._expl),
3057             offset=self.offset,
3058             tlen=self.tlen,
3059             llen=self.llen,
3060             vlen=self.vlen,
3061             expl_offset=self.expl_offset if self.expled else None,
3062             expl_tlen=self.expl_tlen if self.expled else None,
3063             expl_llen=self.expl_llen if self.expled else None,
3064             expl_vlen=self.expl_vlen if self.expled else None,
3065             expl_lenindef=self.expl_lenindef,
3066             bered=self.bered,
3067         )
3068         for pp in self.pps_lenindef(decode_path):
3069             yield pp
3070
3071
3072 BitStringState = namedtuple(
3073     "BitStringState",
3074     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3075     **NAMEDTUPLE_KWARGS
3076 )
3077
3078
3079 class BitString(Obj):
3080     """``BIT STRING`` bit string type
3081
3082     >>> BitString(b"hello world")
3083     BIT STRING 88 bits 68656c6c6f20776f726c64
3084     >>> bytes(b)
3085     b'hello world'
3086     >>> b == b"hello world"
3087     True
3088     >>> b.bit_len
3089     88
3090
3091     >>> BitString("'0A3B5F291CD'H")
3092     BIT STRING 44 bits 0a3b5f291cd0
3093     >>> b = BitString("'010110000000'B")
3094     BIT STRING 12 bits 5800
3095     >>> b.bit_len
3096     12
3097     >>> b[0], b[1], b[2], b[3]
3098     (False, True, False, True)
3099     >>> b[1000]
3100     False
3101     >>> [v for v in b]
3102     [False, True, False, True, True, False, False, False, False, False, False, False]
3103
3104     ::
3105
3106         class KeyUsage(BitString):
3107             schema = (
3108                 ("digitalSignature", 0),
3109                 ("nonRepudiation", 1),
3110                 ("keyEncipherment", 2),
3111             )
3112
3113     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3114     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3115     >>> b.named
3116     ['nonRepudiation', 'keyEncipherment']
3117     >>> b.specs
3118     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3119
3120     .. note::
3121
3122        Pay attention that BIT STRING can be encoded both in primitive
3123        and constructed forms. Decoder always checks constructed form tag
3124        additionally to specified primitive one. If BER decoding is
3125        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3126        of DER restrictions.
3127     """
3128     __slots__ = ("tag_constructed", "specs", "defined")
3129     tag_default = tag_encode(3)
3130     asn1_type_name = "BIT STRING"
3131
3132     def __init__(
3133             self,
3134             value=None,
3135             impl=None,
3136             expl=None,
3137             default=None,
3138             optional=False,
3139             _specs=None,
3140             _decoded=(0, 0, 0),
3141     ):
3142         """
3143         :param value: set the value. Either binary type, tuple of named
3144                       values (if ``schema`` is specified in the class),
3145                       string in ``'XXX...'B`` form, or
3146                       :py:class:`pyderasn.BitString` object
3147         :param bytes impl: override default tag with ``IMPLICIT`` one
3148         :param bytes expl: override default tag with ``EXPLICIT`` one
3149         :param default: set default value. Type same as in ``value``
3150         :param bool optional: is object ``OPTIONAL`` in sequence
3151         """
3152         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3153         specs = getattr(self, "schema", {}) if _specs is None else _specs
3154         self.specs = specs if specs.__class__ == dict else dict(specs)
3155         self._value = None if value is None else self._value_sanitize(value)
3156         if default is not None:
3157             default = self._value_sanitize(default)
3158             self.default = self.__class__(
3159                 value=default,
3160                 impl=self.tag,
3161                 expl=self._expl,
3162             )
3163             if value is None:
3164                 self._value = default
3165         self.defined = None
3166         tag_klass, _, tag_num = tag_decode(self.tag)
3167         self.tag_constructed = tag_encode(
3168             klass=tag_klass,
3169             form=TagFormConstructed,
3170             num=tag_num,
3171         )
3172
3173     def _bits2octets(self, bits):
3174         if len(self.specs) > 0:
3175             bits = bits.rstrip("0")
3176         bit_len = len(bits)
3177         bits += "0" * ((8 - (bit_len % 8)) % 8)
3178         octets = bytearray(len(bits) // 8)
3179         for i in six_xrange(len(octets)):
3180             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3181         return bit_len, bytes(octets)
3182
3183     def _value_sanitize(self, value):
3184         if isinstance(value, (string_types, binary_type)):
3185             if (
3186                     isinstance(value, string_types) and
3187                     value.startswith("'")
3188             ):
3189                 if value.endswith("'B"):
3190                     value = value[1:-2]
3191                     if not frozenset(value) <= SET01:
3192                         raise ValueError("B's coding contains unacceptable chars")
3193                     return self._bits2octets(value)
3194                 if value.endswith("'H"):
3195                     value = value[1:-2]
3196                     return (
3197                         len(value) * 4,
3198                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3199                     )
3200             if value.__class__ == binary_type:
3201                 return (len(value) * 8, value)
3202             raise InvalidValueType((self.__class__, string_types, binary_type))
3203         if value.__class__ == tuple:
3204             if (
3205                     len(value) == 2 and
3206                     isinstance(value[0], integer_types) and
3207                     value[1].__class__ == binary_type
3208             ):
3209                 return value
3210             bits = []
3211             for name in value:
3212                 bit = self.specs.get(name)
3213                 if bit is None:
3214                     raise ObjUnknown("BitString value: %s" % name)
3215                 bits.append(bit)
3216             if len(bits) == 0:
3217                 return self._bits2octets("")
3218             bits = frozenset(bits)
3219             return self._bits2octets("".join(
3220                 ("1" if bit in bits else "0")
3221                 for bit in six_xrange(max(bits) + 1)
3222             ))
3223         if issubclass(value.__class__, BitString):
3224             return value._value
3225         raise InvalidValueType((self.__class__, binary_type, string_types))
3226
3227     @property
3228     def ready(self):
3229         return self._value is not None
3230
3231     def __getstate__(self):
3232         return BitStringState(
3233             __version__,
3234             self.tag,
3235             self._tag_order,
3236             self._expl,
3237             self.default,
3238             self.optional,
3239             self.offset,
3240             self.llen,
3241             self.vlen,
3242             self.expl_lenindef,
3243             self.lenindef,
3244             self.ber_encoded,
3245             self.specs,
3246             self._value,
3247             self.tag_constructed,
3248             self.defined,
3249         )
3250
3251     def __setstate__(self, state):
3252         super(BitString, self).__setstate__(state)
3253         self.specs = state.specs
3254         self._value = state.value
3255         self.tag_constructed = state.tag_constructed
3256         self.defined = state.defined
3257
3258     def __iter__(self):
3259         self._assert_ready()
3260         for i in six_xrange(self._value[0]):
3261             yield self[i]
3262
3263     @property
3264     def bit_len(self):
3265         """Returns number of bits in the string
3266         """
3267         self._assert_ready()
3268         return self._value[0]
3269
3270     def __bytes__(self):
3271         self._assert_ready()
3272         return self._value[1]
3273
3274     def __eq__(self, their):
3275         if their.__class__ == bytes:
3276             return self._value[1] == their
3277         if not issubclass(their.__class__, BitString):
3278             return False
3279         return (
3280             self._value == their._value and
3281             self.tag == their.tag and
3282             self._expl == their._expl
3283         )
3284
3285     @property
3286     def named(self):
3287         """Named representation (if exists) of the bits
3288
3289         :returns: [str(name), ...]
3290         """
3291         return [name for name, bit in iteritems(self.specs) if self[bit]]
3292
3293     def __call__(
3294             self,
3295             value=None,
3296             impl=None,
3297             expl=None,
3298             default=None,
3299             optional=None,
3300     ):
3301         return self.__class__(
3302             value=value,
3303             impl=self.tag if impl is None else impl,
3304             expl=self._expl if expl is None else expl,
3305             default=self.default if default is None else default,
3306             optional=self.optional if optional is None else optional,
3307             _specs=self.specs,
3308         )
3309
3310     def __getitem__(self, key):
3311         if key.__class__ == int:
3312             bit_len, octets = self._value
3313             if key >= bit_len:
3314                 return False
3315             return (
3316                 byte2int(memoryview(octets)[key // 8:]) >>
3317                 (7 - (key % 8))
3318             ) & 1 == 1
3319         if isinstance(key, string_types):
3320             value = self.specs.get(key)
3321             if value is None:
3322                 raise ObjUnknown("BitString value: %s" % key)
3323             return self[value]
3324         raise InvalidValueType((int, str))
3325
3326     def _encode(self):
3327         self._assert_ready()
3328         bit_len, octets = self._value
3329         return b"".join((
3330             self.tag,
3331             len_encode(len(octets) + 1),
3332             int2byte((8 - bit_len % 8) % 8),
3333             octets,
3334         ))
3335
3336     def _encode1st(self, state):
3337         self._assert_ready()
3338         _, octets = self._value
3339         l = len(octets) + 1
3340         return len(self.tag) + len_size(l) + l, state
3341
3342     def _encode2nd(self, writer, state_iter):
3343         bit_len, octets = self._value
3344         write_full(writer, b"".join((
3345             self.tag,
3346             len_encode(len(octets) + 1),
3347             int2byte((8 - bit_len % 8) % 8),
3348         )))
3349         write_full(writer, octets)
3350
3351     def _encode_cer(self, writer):
3352         bit_len, octets = self._value
3353         if len(octets) + 1 <= 1000:
3354             write_full(writer, self._encode())
3355             return
3356         write_full(writer, self.tag_constructed)
3357         write_full(writer, LENINDEF)
3358         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3359             write_full(writer, b"".join((
3360                 BitString.tag_default,
3361                 LEN1K,
3362                 int2byte(0),
3363                 octets[offset:offset + 999],
3364             )))
3365         tail = octets[offset+999:]
3366         if len(tail) > 0:
3367             tail = int2byte((8 - bit_len % 8) % 8) + tail
3368             write_full(writer, b"".join((
3369                 BitString.tag_default,
3370                 len_encode(len(tail)),
3371                 tail,
3372             )))
3373         write_full(writer, EOC)
3374
3375     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3376         try:
3377             t, tlen, lv = tag_strip(tlv)
3378         except DecodeError as err:
3379             raise err.__class__(
3380                 msg=err.msg,
3381                 klass=self.__class__,
3382                 decode_path=decode_path,
3383                 offset=offset,
3384             )
3385         if t == self.tag:
3386             if tag_only:  # pragma: no cover
3387                 yield None
3388                 return
3389             try:
3390                 l, llen, v = len_decode(lv)
3391             except DecodeError as err:
3392                 raise err.__class__(
3393                     msg=err.msg,
3394                     klass=self.__class__,
3395                     decode_path=decode_path,
3396                     offset=offset,
3397                 )
3398             if l > len(v):
3399                 raise NotEnoughData(
3400                     "encoded length is longer than data",
3401                     klass=self.__class__,
3402                     decode_path=decode_path,
3403                     offset=offset,
3404                 )
3405             if l == 0:
3406                 raise NotEnoughData(
3407                     "zero length",
3408                     klass=self.__class__,
3409                     decode_path=decode_path,
3410                     offset=offset,
3411                 )
3412             pad_size = byte2int(v)
3413             if l == 1 and pad_size != 0:
3414                 raise DecodeError(
3415                     "invalid empty value",
3416                     klass=self.__class__,
3417                     decode_path=decode_path,
3418                     offset=offset,
3419                 )
3420             if pad_size > 7:
3421                 raise DecodeError(
3422                     "too big pad",
3423                     klass=self.__class__,
3424                     decode_path=decode_path,
3425                     offset=offset,
3426                 )
3427             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3428                 raise DecodeError(
3429                     "invalid pad",
3430                     klass=self.__class__,
3431                     decode_path=decode_path,
3432                     offset=offset,
3433                 )
3434             v, tail = v[:l], v[l:]
3435             bit_len = (len(v) - 1) * 8 - pad_size
3436             obj = self.__class__(
3437                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3438                 impl=self.tag,
3439                 expl=self._expl,
3440                 default=self.default,
3441                 optional=self.optional,
3442                 _specs=self.specs,
3443                 _decoded=(offset, llen, l),
3444             )
3445             if evgen_mode:
3446                 obj._value = (bit_len, None)
3447             yield decode_path, obj, tail
3448             return
3449         if t != self.tag_constructed:
3450             raise TagMismatch(
3451                 klass=self.__class__,
3452                 decode_path=decode_path,
3453                 offset=offset,
3454             )
3455         if not ctx.get("bered", False):
3456             raise DecodeError(
3457                 "unallowed BER constructed encoding",
3458                 klass=self.__class__,
3459                 decode_path=decode_path,
3460                 offset=offset,
3461             )
3462         if tag_only:  # pragma: no cover
3463             yield None
3464             return
3465         lenindef = False
3466         try:
3467             l, llen, v = len_decode(lv)
3468         except LenIndefForm:
3469             llen, l, v = 1, 0, lv[1:]
3470             lenindef = True
3471         except DecodeError as err:
3472             raise err.__class__(
3473                 msg=err.msg,
3474                 klass=self.__class__,
3475                 decode_path=decode_path,
3476                 offset=offset,
3477             )
3478         if l > len(v):
3479             raise NotEnoughData(
3480                 "encoded length is longer than data",
3481                 klass=self.__class__,
3482                 decode_path=decode_path,
3483                 offset=offset,
3484             )
3485         if not lenindef and l == 0:
3486             raise NotEnoughData(
3487                 "zero length",
3488                 klass=self.__class__,
3489                 decode_path=decode_path,
3490                 offset=offset,
3491             )
3492         chunks = []
3493         sub_offset = offset + tlen + llen
3494         vlen = 0
3495         while True:
3496             if lenindef:
3497                 if v[:EOC_LEN].tobytes() == EOC:
3498                     break
3499             else:
3500                 if vlen == l:
3501                     break
3502                 if vlen > l:
3503                     raise DecodeError(
3504                         "chunk out of bounds",
3505                         klass=self.__class__,
3506                         decode_path=decode_path + (str(len(chunks) - 1),),
3507                         offset=chunks[-1].offset,
3508                     )
3509             sub_decode_path = decode_path + (str(len(chunks)),)
3510             try:
3511                 if evgen_mode:
3512                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3513                             v,
3514                             offset=sub_offset,
3515                             decode_path=sub_decode_path,
3516                             leavemm=True,
3517                             ctx=ctx,
3518                             _ctx_immutable=False,
3519                     ):
3520                         yield _decode_path, chunk, v_tail
3521                 else:
3522                     _, chunk, v_tail = next(BitString().decode_evgen(
3523                         v,
3524                         offset=sub_offset,
3525                         decode_path=sub_decode_path,
3526                         leavemm=True,
3527                         ctx=ctx,
3528                         _ctx_immutable=False,
3529                         _evgen_mode=False,
3530                     ))
3531             except TagMismatch:
3532                 raise DecodeError(
3533                     "expected BitString encoded chunk",
3534                     klass=self.__class__,
3535                     decode_path=sub_decode_path,
3536                     offset=sub_offset,
3537                 )
3538             chunks.append(chunk)
3539             sub_offset += chunk.tlvlen
3540             vlen += chunk.tlvlen
3541             v = v_tail
3542         if len(chunks) == 0:
3543             raise DecodeError(
3544                 "no chunks",
3545                 klass=self.__class__,
3546                 decode_path=decode_path,
3547                 offset=offset,
3548             )
3549         values = []
3550         bit_len = 0
3551         for chunk_i, chunk in enumerate(chunks[:-1]):
3552             if chunk.bit_len % 8 != 0:
3553                 raise DecodeError(
3554                     "BitString chunk is not multiple of 8 bits",
3555                     klass=self.__class__,
3556                     decode_path=decode_path + (str(chunk_i),),
3557                     offset=chunk.offset,
3558                 )
3559             if not evgen_mode:
3560                 values.append(bytes(chunk))
3561             bit_len += chunk.bit_len
3562         chunk_last = chunks[-1]
3563         if not evgen_mode:
3564             values.append(bytes(chunk_last))
3565         bit_len += chunk_last.bit_len
3566         obj = self.__class__(
3567             value=None if evgen_mode else (bit_len, b"".join(values)),
3568             impl=self.tag,
3569             expl=self._expl,
3570             default=self.default,
3571             optional=self.optional,
3572             _specs=self.specs,
3573             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3574         )
3575         if evgen_mode:
3576             obj._value = (bit_len, None)
3577         obj.lenindef = lenindef
3578         obj.ber_encoded = True
3579         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3580
3581     def __repr__(self):
3582         return pp_console_row(next(self.pps()))
3583
3584     def pps(self, decode_path=()):
3585         value = None
3586         blob = None
3587         if self.ready:
3588             bit_len, blob = self._value
3589             value = "%d bits" % bit_len
3590             if len(self.specs) > 0 and blob is not None:
3591                 blob = tuple(self.named)
3592         yield _pp(
3593             obj=self,
3594             asn1_type_name=self.asn1_type_name,
3595             obj_name=self.__class__.__name__,
3596             decode_path=decode_path,
3597             value=value,
3598             blob=blob,
3599             optional=self.optional,
3600             default=self == self.default,
3601             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3602             expl=None if self._expl is None else tag_decode(self._expl),
3603             offset=self.offset,
3604             tlen=self.tlen,
3605             llen=self.llen,
3606             vlen=self.vlen,
3607             expl_offset=self.expl_offset if self.expled else None,
3608             expl_tlen=self.expl_tlen if self.expled else None,
3609             expl_llen=self.expl_llen if self.expled else None,
3610             expl_vlen=self.expl_vlen if self.expled else None,
3611             expl_lenindef=self.expl_lenindef,
3612             lenindef=self.lenindef,
3613             ber_encoded=self.ber_encoded,
3614             bered=self.bered,
3615         )
3616         defined_by, defined = self.defined or (None, None)
3617         if defined_by is not None:
3618             yield defined.pps(
3619                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3620             )
3621         for pp in self.pps_lenindef(decode_path):
3622             yield pp
3623
3624
3625 OctetStringState = namedtuple(
3626     "OctetStringState",
3627     BasicState._fields + (
3628         "value",
3629         "bound_min",
3630         "bound_max",
3631         "tag_constructed",
3632         "defined",
3633     ),
3634     **NAMEDTUPLE_KWARGS
3635 )
3636
3637
3638 class OctetString(Obj):
3639     """``OCTET STRING`` binary string type
3640
3641     >>> s = OctetString(b"hello world")
3642     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3643     >>> s == OctetString(b"hello world")
3644     True
3645     >>> bytes(s)
3646     b'hello world'
3647
3648     >>> OctetString(b"hello", bounds=(4, 4))
3649     Traceback (most recent call last):
3650     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3651     >>> OctetString(b"hell", bounds=(4, 4))
3652     OCTET STRING 4 bytes 68656c6c
3653
3654     Memoryviews can be used as a values. If memoryview is made on
3655     mmap-ed file, then it does not take storage inside OctetString
3656     itself. In CER encoding mode it will be streamed to the specified
3657     writer, copying 1 KB chunks.
3658     """
3659     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3660     tag_default = tag_encode(4)
3661     asn1_type_name = "OCTET STRING"
3662     evgen_mode_skip_value = True
3663
3664     def __init__(
3665             self,
3666             value=None,
3667             bounds=None,
3668             impl=None,
3669             expl=None,
3670             default=None,
3671             optional=False,
3672             _decoded=(0, 0, 0),
3673             ctx=None,
3674     ):
3675         """
3676         :param value: set the value. Either binary type, or
3677                       :py:class:`pyderasn.OctetString` object
3678         :param bounds: set ``(MIN, MAX)`` value size constraint.
3679                        (-inf, +inf) by default
3680         :param bytes impl: override default tag with ``IMPLICIT`` one
3681         :param bytes expl: override default tag with ``EXPLICIT`` one
3682         :param default: set default value. Type same as in ``value``
3683         :param bool optional: is object ``OPTIONAL`` in sequence
3684         """
3685         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3686         self._value = value
3687         self._bound_min, self._bound_max = getattr(
3688             self,
3689             "bounds",
3690             (0, float("+inf")),
3691         ) if bounds is None else bounds
3692         if value is not None:
3693             self._value = self._value_sanitize(value)
3694         if default is not None:
3695             default = self._value_sanitize(default)
3696             self.default = self.__class__(
3697                 value=default,
3698                 impl=self.tag,
3699                 expl=self._expl,
3700             )
3701             if self._value is None:
3702                 self._value = default
3703         self.defined = None
3704         tag_klass, _, tag_num = tag_decode(self.tag)
3705         self.tag_constructed = tag_encode(
3706             klass=tag_klass,
3707             form=TagFormConstructed,
3708             num=tag_num,
3709         )
3710
3711     def _value_sanitize(self, value):
3712         if value.__class__ == binary_type or value.__class__ == memoryview:
3713             pass
3714         elif issubclass(value.__class__, OctetString):
3715             value = value._value
3716         else:
3717             raise InvalidValueType((self.__class__, bytes, memoryview))
3718         if not self._bound_min <= len(value) <= self._bound_max:
3719             raise BoundsError(self._bound_min, len(value), self._bound_max)
3720         return value
3721
3722     @property
3723     def ready(self):
3724         return self._value is not None
3725
3726     def __getstate__(self):
3727         return OctetStringState(
3728             __version__,
3729             self.tag,
3730             self._tag_order,
3731             self._expl,
3732             self.default,
3733             self.optional,
3734             self.offset,
3735             self.llen,
3736             self.vlen,
3737             self.expl_lenindef,
3738             self.lenindef,
3739             self.ber_encoded,
3740             self._value,
3741             self._bound_min,
3742             self._bound_max,
3743             self.tag_constructed,
3744             self.defined,
3745         )
3746
3747     def __setstate__(self, state):
3748         super(OctetString, self).__setstate__(state)
3749         self._value = state.value
3750         self._bound_min = state.bound_min
3751         self._bound_max = state.bound_max
3752         self.tag_constructed = state.tag_constructed
3753         self.defined = state.defined
3754
3755     def __bytes__(self):
3756         self._assert_ready()
3757         return bytes(self._value)
3758
3759     def __eq__(self, their):
3760         if their.__class__ == binary_type:
3761             return self._value == their
3762         if not issubclass(their.__class__, OctetString):
3763             return False
3764         return (
3765             self._value == their._value and
3766             self.tag == their.tag and
3767             self._expl == their._expl
3768         )
3769
3770     def __lt__(self, their):
3771         return self._value < their._value
3772
3773     def __call__(
3774             self,
3775             value=None,
3776             bounds=None,
3777             impl=None,
3778             expl=None,
3779             default=None,
3780             optional=None,
3781     ):
3782         return self.__class__(
3783             value=value,
3784             bounds=(
3785                 (self._bound_min, self._bound_max)
3786                 if bounds is None else bounds
3787             ),
3788             impl=self.tag if impl is None else impl,
3789             expl=self._expl if expl is None else expl,
3790             default=self.default if default is None else default,
3791             optional=self.optional if optional is None else optional,
3792         )
3793
3794     def _encode(self):
3795         self._assert_ready()
3796         return b"".join((
3797             self.tag,
3798             len_encode(len(self._value)),
3799             self._value,
3800         ))
3801
3802     def _encode1st(self, state):
3803         self._assert_ready()
3804         l = len(self._value)
3805         return len(self.tag) + len_size(l) + l, state
3806
3807     def _encode2nd(self, writer, state_iter):
3808         value = self._value
3809         write_full(writer, self.tag + len_encode(len(value)))
3810         write_full(writer, value)
3811
3812     def _encode_cer(self, writer):
3813         octets = self._value
3814         if len(octets) <= 1000:
3815             write_full(writer, self._encode())
3816             return
3817         write_full(writer, self.tag_constructed)
3818         write_full(writer, LENINDEF)
3819         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3820             write_full(writer, b"".join((
3821                 OctetString.tag_default,
3822                 LEN1K,
3823                 octets[offset:offset + 1000],
3824             )))
3825         tail = octets[offset+1000:]
3826         if len(tail) > 0:
3827             write_full(writer, b"".join((
3828                 OctetString.tag_default,
3829                 len_encode(len(tail)),
3830                 tail,
3831             )))
3832         write_full(writer, EOC)
3833
3834     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3835         try:
3836             t, tlen, lv = tag_strip(tlv)
3837         except DecodeError as err:
3838             raise err.__class__(
3839                 msg=err.msg,
3840                 klass=self.__class__,
3841                 decode_path=decode_path,
3842                 offset=offset,
3843             )
3844         if t == self.tag:
3845             if tag_only:
3846                 yield None
3847                 return
3848             try:
3849                 l, llen, v = len_decode(lv)
3850             except DecodeError as err:
3851                 raise err.__class__(
3852                     msg=err.msg,
3853                     klass=self.__class__,
3854                     decode_path=decode_path,
3855                     offset=offset,
3856                 )
3857             if l > len(v):
3858                 raise NotEnoughData(
3859                     "encoded length is longer than data",
3860                     klass=self.__class__,
3861                     decode_path=decode_path,
3862                     offset=offset,
3863                 )
3864             v, tail = v[:l], v[l:]
3865             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3866                 raise DecodeError(
3867                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3868                     klass=self.__class__,
3869                     decode_path=decode_path,
3870                     offset=offset,
3871                 )
3872             try:
3873                 obj = self.__class__(
3874                     value=(
3875                         None if (evgen_mode and self.evgen_mode_skip_value)
3876                         else v.tobytes()
3877                     ),
3878                     bounds=(self._bound_min, self._bound_max),
3879                     impl=self.tag,
3880                     expl=self._expl,
3881                     default=self.default,
3882                     optional=self.optional,
3883                     _decoded=(offset, llen, l),
3884                     ctx=ctx,
3885                 )
3886             except DecodeError as err:
3887                 raise DecodeError(
3888                     msg=err.msg,
3889                     klass=self.__class__,
3890                     decode_path=decode_path,
3891                     offset=offset,
3892                 )
3893             except BoundsError as err:
3894                 raise DecodeError(
3895                     msg=str(err),
3896                     klass=self.__class__,
3897                     decode_path=decode_path,
3898                     offset=offset,
3899                 )
3900             yield decode_path, obj, tail
3901             return
3902         if t != self.tag_constructed:
3903             raise TagMismatch(
3904                 klass=self.__class__,
3905                 decode_path=decode_path,
3906                 offset=offset,
3907             )
3908         if not ctx.get("bered", False):
3909             raise DecodeError(
3910                 "unallowed BER constructed encoding",
3911                 klass=self.__class__,
3912                 decode_path=decode_path,
3913                 offset=offset,
3914             )
3915         if tag_only:
3916             yield None
3917             return
3918         lenindef = False
3919         try:
3920             l, llen, v = len_decode(lv)
3921         except LenIndefForm:
3922             llen, l, v = 1, 0, lv[1:]
3923             lenindef = True
3924         except DecodeError as err:
3925             raise err.__class__(
3926                 msg=err.msg,
3927                 klass=self.__class__,
3928                 decode_path=decode_path,
3929                 offset=offset,
3930             )
3931         if l > len(v):
3932             raise NotEnoughData(
3933                 "encoded length is longer than data",
3934                 klass=self.__class__,
3935                 decode_path=decode_path,
3936                 offset=offset,
3937             )
3938         chunks = []
3939         chunks_count = 0
3940         sub_offset = offset + tlen + llen
3941         vlen = 0
3942         payload_len = 0
3943         while True:
3944             if lenindef:
3945                 if v[:EOC_LEN].tobytes() == EOC:
3946                     break
3947             else:
3948                 if vlen == l:
3949                     break
3950                 if vlen > l:
3951                     raise DecodeError(
3952                         "chunk out of bounds",
3953                         klass=self.__class__,
3954                         decode_path=decode_path + (str(len(chunks) - 1),),
3955                         offset=chunks[-1].offset,
3956                     )
3957             try:
3958                 if evgen_mode:
3959                     sub_decode_path = decode_path + (str(chunks_count),)
3960                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3961                             v,
3962                             offset=sub_offset,
3963                             decode_path=sub_decode_path,
3964                             leavemm=True,
3965                             ctx=ctx,
3966                             _ctx_immutable=False,
3967                     ):
3968                         yield _decode_path, chunk, v_tail
3969                         if not chunk.ber_encoded:
3970                             payload_len += chunk.vlen
3971                     chunks_count += 1
3972                 else:
3973                     sub_decode_path = decode_path + (str(len(chunks)),)
3974                     _, chunk, v_tail = next(OctetString().decode_evgen(
3975                         v,
3976                         offset=sub_offset,
3977                         decode_path=sub_decode_path,
3978                         leavemm=True,
3979                         ctx=ctx,
3980                         _ctx_immutable=False,
3981                         _evgen_mode=False,
3982                     ))
3983                     chunks.append(chunk)
3984             except TagMismatch:
3985                 raise DecodeError(
3986                     "expected OctetString encoded chunk",
3987                     klass=self.__class__,
3988                     decode_path=sub_decode_path,
3989                     offset=sub_offset,
3990                 )
3991             sub_offset += chunk.tlvlen
3992             vlen += chunk.tlvlen
3993             v = v_tail
3994         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3995             raise DecodeError(
3996                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3997                 klass=self.__class__,
3998                 decode_path=decode_path,
3999                 offset=offset,
4000             )
4001         try:
4002             obj = self.__class__(
4003                 value=(
4004                     None if evgen_mode else
4005                     b"".join(bytes(chunk) for chunk in chunks)
4006                 ),
4007                 bounds=(self._bound_min, self._bound_max),
4008                 impl=self.tag,
4009                 expl=self._expl,
4010                 default=self.default,
4011                 optional=self.optional,
4012                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4013                 ctx=ctx,
4014             )
4015         except DecodeError as err:
4016             raise DecodeError(
4017                 msg=err.msg,
4018                 klass=self.__class__,
4019                 decode_path=decode_path,
4020                 offset=offset,
4021             )
4022         except BoundsError as err:
4023             raise DecodeError(
4024                 msg=str(err),
4025                 klass=self.__class__,
4026                 decode_path=decode_path,
4027                 offset=offset,
4028             )
4029         obj.lenindef = lenindef
4030         obj.ber_encoded = True
4031         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4032
4033     def __repr__(self):
4034         return pp_console_row(next(self.pps()))
4035
4036     def pps(self, decode_path=()):
4037         yield _pp(
4038             obj=self,
4039             asn1_type_name=self.asn1_type_name,
4040             obj_name=self.__class__.__name__,
4041             decode_path=decode_path,
4042             value=("%d bytes" % len(self._value)) if self.ready else None,
4043             blob=self._value if self.ready else None,
4044             optional=self.optional,
4045             default=self == self.default,
4046             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4047             expl=None if self._expl is None else tag_decode(self._expl),
4048             offset=self.offset,
4049             tlen=self.tlen,
4050             llen=self.llen,
4051             vlen=self.vlen,
4052             expl_offset=self.expl_offset if self.expled else None,
4053             expl_tlen=self.expl_tlen if self.expled else None,
4054             expl_llen=self.expl_llen if self.expled else None,
4055             expl_vlen=self.expl_vlen if self.expled else None,
4056             expl_lenindef=self.expl_lenindef,
4057             lenindef=self.lenindef,
4058             ber_encoded=self.ber_encoded,
4059             bered=self.bered,
4060         )
4061         defined_by, defined = self.defined or (None, None)
4062         if defined_by is not None:
4063             yield defined.pps(
4064                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4065             )
4066         for pp in self.pps_lenindef(decode_path):
4067             yield pp
4068
4069
4070 def agg_octet_string(evgens, decode_path, raw, writer):
4071     """Aggregate constructed string (OctetString and its derivatives)
4072
4073     :param evgens: iterator of generated events
4074     :param decode_path: points to the string we want to decode
4075     :param raw: slicebable (memoryview, bytearray, etc) with
4076                 the data evgens are generated on
4077     :param writer: buffer.write where string is going to be saved
4078     :param writer: where string is going to be saved. Must comply
4079                    with ``io.RawIOBase.write`` behaviour
4080
4081     .. seealso:: :ref:`agg_octet_string`
4082     """
4083     decode_path_len = len(decode_path)
4084     for dp, obj, _ in evgens:
4085         if dp[:decode_path_len] != decode_path:
4086             continue
4087         if not obj.ber_encoded:
4088             write_full(writer, raw[
4089                 obj.offset + obj.tlen + obj.llen:
4090                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4091                 (EOC_LEN if obj.expl_lenindef else 0)
4092             ])
4093         if len(dp) == decode_path_len:
4094             break
4095
4096
4097 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4098
4099
4100 class Null(Obj):
4101     """``NULL`` null object
4102
4103     >>> n = Null()
4104     NULL
4105     >>> n.ready
4106     True
4107     """
4108     __slots__ = ()
4109     tag_default = tag_encode(5)
4110     asn1_type_name = "NULL"
4111
4112     def __init__(
4113             self,
4114             value=None,  # unused, but Sequence passes it
4115             impl=None,
4116             expl=None,
4117             optional=False,
4118             _decoded=(0, 0, 0),
4119     ):
4120         """
4121         :param bytes impl: override default tag with ``IMPLICIT`` one
4122         :param bytes expl: override default tag with ``EXPLICIT`` one
4123         :param bool optional: is object ``OPTIONAL`` in sequence
4124         """
4125         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4126         self.default = None
4127
4128     @property
4129     def ready(self):
4130         return True
4131
4132     def __getstate__(self):
4133         return NullState(
4134             __version__,
4135             self.tag,
4136             self._tag_order,
4137             self._expl,
4138             self.default,
4139             self.optional,
4140             self.offset,
4141             self.llen,
4142             self.vlen,
4143             self.expl_lenindef,
4144             self.lenindef,
4145             self.ber_encoded,
4146         )
4147
4148     def __eq__(self, their):
4149         if not issubclass(their.__class__, Null):
4150             return False
4151         return (
4152             self.tag == their.tag and
4153             self._expl == their._expl
4154         )
4155
4156     def __call__(
4157             self,
4158             value=None,
4159             impl=None,
4160             expl=None,
4161             optional=None,
4162     ):
4163         return self.__class__(
4164             impl=self.tag if impl is None else impl,
4165             expl=self._expl if expl is None else expl,
4166             optional=self.optional if optional is None else optional,
4167         )
4168
4169     def _encode(self):
4170         return self.tag + LEN0
4171
4172     def _encode1st(self, state):
4173         return len(self.tag) + 1, state
4174
4175     def _encode2nd(self, writer, state_iter):
4176         write_full(writer, self.tag + LEN0)
4177
4178     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4179         try:
4180             t, _, lv = tag_strip(tlv)
4181         except DecodeError as err:
4182             raise err.__class__(
4183                 msg=err.msg,
4184                 klass=self.__class__,
4185                 decode_path=decode_path,
4186                 offset=offset,
4187             )
4188         if t != self.tag:
4189             raise TagMismatch(
4190                 klass=self.__class__,
4191                 decode_path=decode_path,
4192                 offset=offset,
4193             )
4194         if tag_only:  # pragma: no cover
4195             yield None
4196             return
4197         try:
4198             l, _, v = len_decode(lv)
4199         except DecodeError as err:
4200             raise err.__class__(
4201                 msg=err.msg,
4202                 klass=self.__class__,
4203                 decode_path=decode_path,
4204                 offset=offset,
4205             )
4206         if l != 0:
4207             raise InvalidLength(
4208                 "Null must have zero length",
4209                 klass=self.__class__,
4210                 decode_path=decode_path,
4211                 offset=offset,
4212             )
4213         obj = self.__class__(
4214             impl=self.tag,
4215             expl=self._expl,
4216             optional=self.optional,
4217             _decoded=(offset, 1, 0),
4218         )
4219         yield decode_path, obj, v
4220
4221     def __repr__(self):
4222         return pp_console_row(next(self.pps()))
4223
4224     def pps(self, decode_path=()):
4225         yield _pp(
4226             obj=self,
4227             asn1_type_name=self.asn1_type_name,
4228             obj_name=self.__class__.__name__,
4229             decode_path=decode_path,
4230             optional=self.optional,
4231             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4232             expl=None if self._expl is None else tag_decode(self._expl),
4233             offset=self.offset,
4234             tlen=self.tlen,
4235             llen=self.llen,
4236             vlen=self.vlen,
4237             expl_offset=self.expl_offset if self.expled else None,
4238             expl_tlen=self.expl_tlen if self.expled else None,
4239             expl_llen=self.expl_llen if self.expled else None,
4240             expl_vlen=self.expl_vlen if self.expled else None,
4241             expl_lenindef=self.expl_lenindef,
4242             bered=self.bered,
4243         )
4244         for pp in self.pps_lenindef(decode_path):
4245             yield pp
4246
4247
4248 ObjectIdentifierState = namedtuple(
4249     "ObjectIdentifierState",
4250     BasicState._fields + ("value", "defines"),
4251     **NAMEDTUPLE_KWARGS
4252 )
4253
4254
4255 class ObjectIdentifier(Obj):
4256     """``OBJECT IDENTIFIER`` OID type
4257
4258     >>> oid = ObjectIdentifier((1, 2, 3))
4259     OBJECT IDENTIFIER 1.2.3
4260     >>> oid == ObjectIdentifier("1.2.3")
4261     True
4262     >>> tuple(oid)
4263     (1, 2, 3)
4264     >>> str(oid)
4265     '1.2.3'
4266     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4267     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4268
4269     >>> str(ObjectIdentifier((3, 1)))
4270     Traceback (most recent call last):
4271     pyderasn.InvalidOID: unacceptable first arc value
4272     """
4273     __slots__ = ("defines",)
4274     tag_default = tag_encode(6)
4275     asn1_type_name = "OBJECT IDENTIFIER"
4276
4277     def __init__(
4278             self,
4279             value=None,
4280             defines=(),
4281             impl=None,
4282             expl=None,
4283             default=None,
4284             optional=False,
4285             _decoded=(0, 0, 0),
4286     ):
4287         """
4288         :param value: set the value. Either tuples of integers,
4289                       string of "."-concatenated integers, or
4290                       :py:class:`pyderasn.ObjectIdentifier` object
4291         :param defines: sequence of tuples. Each tuple has two elements.
4292                         First one is relative to current one decode
4293                         path, aiming to the field defined by that OID.
4294                         Read about relative path in
4295                         :py:func:`pyderasn.abs_decode_path`. Second
4296                         tuple element is ``{OID: pyderasn.Obj()}``
4297                         dictionary, mapping between current OID value
4298                         and structure applied to defined field.
4299
4300                         .. seealso:: :ref:`definedby`
4301
4302         :param bytes impl: override default tag with ``IMPLICIT`` one
4303         :param bytes expl: override default tag with ``EXPLICIT`` one
4304         :param default: set default value. Type same as in ``value``
4305         :param bool optional: is object ``OPTIONAL`` in sequence
4306         """
4307         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4308         self._value = value
4309         if value is not None:
4310             self._value = self._value_sanitize(value)
4311         if default is not None:
4312             default = self._value_sanitize(default)
4313             self.default = self.__class__(
4314                 value=default,
4315                 impl=self.tag,
4316                 expl=self._expl,
4317             )
4318             if self._value is None:
4319                 self._value = default
4320         self.defines = defines
4321
4322     def __add__(self, their):
4323         if their.__class__ == tuple:
4324             return self.__class__(self._value + array("L", their))
4325         if isinstance(their, self.__class__):
4326             return self.__class__(self._value + their._value)
4327         raise InvalidValueType((self.__class__, tuple))
4328
4329     def _value_sanitize(self, value):
4330         if issubclass(value.__class__, ObjectIdentifier):
4331             return value._value
4332         if isinstance(value, string_types):
4333             try:
4334                 value = array("L", (pureint(arc) for arc in value.split(".")))
4335             except ValueError:
4336                 raise InvalidOID("unacceptable arcs values")
4337         if value.__class__ == tuple:
4338             try:
4339                 value = array("L", value)
4340             except OverflowError as err:
4341                 raise InvalidOID(repr(err))
4342         if value.__class__ is array:
4343             if len(value) < 2:
4344                 raise InvalidOID("less than 2 arcs")
4345             first_arc = value[0]
4346             if first_arc in (0, 1):
4347                 if not (0 <= value[1] <= 39):
4348                     raise InvalidOID("second arc is too wide")
4349             elif first_arc == 2:
4350                 pass
4351             else:
4352                 raise InvalidOID("unacceptable first arc value")
4353             if not all(arc >= 0 for arc in value):
4354                 raise InvalidOID("negative arc value")
4355             return value
4356         raise InvalidValueType((self.__class__, str, tuple))
4357
4358     @property
4359     def ready(self):
4360         return self._value is not None
4361
4362     def __getstate__(self):
4363         return ObjectIdentifierState(
4364             __version__,
4365             self.tag,
4366             self._tag_order,
4367             self._expl,
4368             self.default,
4369             self.optional,
4370             self.offset,
4371             self.llen,
4372             self.vlen,
4373             self.expl_lenindef,
4374             self.lenindef,
4375             self.ber_encoded,
4376             self._value,
4377             self.defines,
4378         )
4379
4380     def __setstate__(self, state):
4381         super(ObjectIdentifier, self).__setstate__(state)
4382         self._value = state.value
4383         self.defines = state.defines
4384
4385     def __iter__(self):
4386         self._assert_ready()
4387         return iter(self._value)
4388
4389     def __str__(self):
4390         return ".".join(str(arc) for arc in self._value or ())
4391
4392     def __hash__(self):
4393         self._assert_ready()
4394         return hash(b"".join((
4395             self.tag,
4396             bytes(self._expl or b""),
4397             str(self._value).encode("ascii"),
4398         )))
4399
4400     def __eq__(self, their):
4401         if their.__class__ == tuple:
4402             return self._value == array("L", their)
4403         if not issubclass(their.__class__, ObjectIdentifier):
4404             return False
4405         return (
4406             self.tag == their.tag and
4407             self._expl == their._expl and
4408             self._value == their._value
4409         )
4410
4411     def __lt__(self, their):
4412         return self._value < their._value
4413
4414     def __call__(
4415             self,
4416             value=None,
4417             defines=None,
4418             impl=None,
4419             expl=None,
4420             default=None,
4421             optional=None,
4422     ):
4423         return self.__class__(
4424             value=value,
4425             defines=self.defines if defines is None else defines,
4426             impl=self.tag if impl is None else impl,
4427             expl=self._expl if expl is None else expl,
4428             default=self.default if default is None else default,
4429             optional=self.optional if optional is None else optional,
4430         )
4431
4432     def _encode_octets(self):
4433         self._assert_ready()
4434         value = self._value
4435         first_value = value[1]
4436         first_arc = value[0]
4437         if first_arc == 0:
4438             pass
4439         elif first_arc == 1:
4440             first_value += 40
4441         elif first_arc == 2:
4442             first_value += 80
4443         else:  # pragma: no cover
4444             raise RuntimeError("invalid arc is stored")
4445         octets = [zero_ended_encode(first_value)]
4446         for arc in value[2:]:
4447             octets.append(zero_ended_encode(arc))
4448         return b"".join(octets)
4449
4450     def _encode(self):
4451         v = self._encode_octets()
4452         return b"".join((self.tag, len_encode(len(v)), v))
4453
4454     def _encode1st(self, state):
4455         l = len(self._encode_octets())
4456         return len(self.tag) + len_size(l) + l, state
4457
4458     def _encode2nd(self, writer, state_iter):
4459         write_full(writer, self._encode())
4460
4461     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4462         try:
4463             t, _, lv = tag_strip(tlv)
4464         except DecodeError as err:
4465             raise err.__class__(
4466                 msg=err.msg,
4467                 klass=self.__class__,
4468                 decode_path=decode_path,
4469                 offset=offset,
4470             )
4471         if t != self.tag:
4472             raise TagMismatch(
4473                 klass=self.__class__,
4474                 decode_path=decode_path,
4475                 offset=offset,
4476             )
4477         if tag_only:  # pragma: no cover
4478             yield None
4479             return
4480         try:
4481             l, llen, v = len_decode(lv)
4482         except DecodeError as err:
4483             raise err.__class__(
4484                 msg=err.msg,
4485                 klass=self.__class__,
4486                 decode_path=decode_path,
4487                 offset=offset,
4488             )
4489         if l > len(v):
4490             raise NotEnoughData(
4491                 "encoded length is longer than data",
4492                 klass=self.__class__,
4493                 decode_path=decode_path,
4494                 offset=offset,
4495             )
4496         if l == 0:
4497             raise NotEnoughData(
4498                 "zero length",
4499                 klass=self.__class__,
4500                 decode_path=decode_path,
4501                 offset=offset,
4502             )
4503         v, tail = v[:l], v[l:]
4504         arcs = array("L")
4505         ber_encoded = False
4506         while len(v) > 0:
4507             i = 0
4508             arc = 0
4509             while True:
4510                 octet = indexbytes(v, i)
4511                 if i == 0 and octet == 0x80:
4512                     if ctx.get("bered", False):
4513                         ber_encoded = True
4514                     else:
4515                         raise DecodeError(
4516                             "non normalized arc encoding",
4517                             klass=self.__class__,
4518                             decode_path=decode_path,
4519                             offset=offset,
4520                         )
4521                 arc = (arc << 7) | (octet & 0x7F)
4522                 if octet & 0x80 == 0:
4523                     try:
4524                         arcs.append(arc)
4525                     except OverflowError:
4526                         raise DecodeError(
4527                             "too huge value for local unsigned long",
4528                             klass=self.__class__,
4529                             decode_path=decode_path,
4530                             offset=offset,
4531                         )
4532                     v = v[i + 1:]
4533                     break
4534                 i += 1
4535                 if i == len(v):
4536                     raise DecodeError(
4537                         "unfinished OID",
4538                         klass=self.__class__,
4539                         decode_path=decode_path,
4540                         offset=offset,
4541                     )
4542         first_arc = 0
4543         second_arc = arcs[0]
4544         if 0 <= second_arc <= 39:
4545             first_arc = 0
4546         elif 40 <= second_arc <= 79:
4547             first_arc = 1
4548             second_arc -= 40
4549         else:
4550             first_arc = 2
4551             second_arc -= 80
4552         obj = self.__class__(
4553             value=array("L", (first_arc, second_arc)) + arcs[1:],
4554             impl=self.tag,
4555             expl=self._expl,
4556             default=self.default,
4557             optional=self.optional,
4558             _decoded=(offset, llen, l),
4559         )
4560         if ber_encoded:
4561             obj.ber_encoded = True
4562         yield decode_path, obj, tail
4563
4564     def __repr__(self):
4565         return pp_console_row(next(self.pps()))
4566
4567     def pps(self, decode_path=()):
4568         yield _pp(
4569             obj=self,
4570             asn1_type_name=self.asn1_type_name,
4571             obj_name=self.__class__.__name__,
4572             decode_path=decode_path,
4573             value=str(self) if self.ready else None,
4574             optional=self.optional,
4575             default=self == self.default,
4576             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4577             expl=None if self._expl is None else tag_decode(self._expl),
4578             offset=self.offset,
4579             tlen=self.tlen,
4580             llen=self.llen,
4581             vlen=self.vlen,
4582             expl_offset=self.expl_offset if self.expled else None,
4583             expl_tlen=self.expl_tlen if self.expled else None,
4584             expl_llen=self.expl_llen if self.expled else None,
4585             expl_vlen=self.expl_vlen if self.expled else None,
4586             expl_lenindef=self.expl_lenindef,
4587             ber_encoded=self.ber_encoded,
4588             bered=self.bered,
4589         )
4590         for pp in self.pps_lenindef(decode_path):
4591             yield pp
4592
4593
4594 class Enumerated(Integer):
4595     """``ENUMERATED`` integer type
4596
4597     This type is identical to :py:class:`pyderasn.Integer`, but requires
4598     schema to be specified and does not accept values missing from it.
4599     """
4600     __slots__ = ()
4601     tag_default = tag_encode(10)
4602     asn1_type_name = "ENUMERATED"
4603
4604     def __init__(
4605             self,
4606             value=None,
4607             impl=None,
4608             expl=None,
4609             default=None,
4610             optional=False,
4611             _specs=None,
4612             _decoded=(0, 0, 0),
4613             bounds=None,  # dummy argument, workability for Integer.decode
4614     ):
4615         super(Enumerated, self).__init__(
4616             value, bounds, impl, expl, default, optional, _specs, _decoded,
4617         )
4618         if len(self.specs) == 0:
4619             raise ValueError("schema must be specified")
4620
4621     def _value_sanitize(self, value):
4622         if isinstance(value, self.__class__):
4623             value = value._value
4624         elif isinstance(value, integer_types):
4625             for _value in itervalues(self.specs):
4626                 if _value == value:
4627                     break
4628             else:
4629                 raise DecodeError(
4630                     "unknown integer value: %s" % value,
4631                     klass=self.__class__,
4632                 )
4633         elif isinstance(value, string_types):
4634             value = self.specs.get(value)
4635             if value is None:
4636                 raise ObjUnknown("integer value: %s" % value)
4637         else:
4638             raise InvalidValueType((self.__class__, int, str))
4639         return value
4640
4641     def __call__(
4642             self,
4643             value=None,
4644             impl=None,
4645             expl=None,
4646             default=None,
4647             optional=None,
4648             _specs=None,
4649     ):
4650         return self.__class__(
4651             value=value,
4652             impl=self.tag if impl is None else impl,
4653             expl=self._expl if expl is None else expl,
4654             default=self.default if default is None else default,
4655             optional=self.optional if optional is None else optional,
4656             _specs=self.specs,
4657         )
4658
4659
4660 def escape_control_unicode(c):
4661     if unicat(c)[0] == "C":
4662         c = repr(c).lstrip("u").strip("'")
4663     return c
4664
4665
4666 class CommonString(OctetString):
4667     """Common class for all strings
4668
4669     Everything resembles :py:class:`pyderasn.OctetString`, except
4670     ability to deal with unicode text strings.
4671
4672     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4673     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4674     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4675     True
4676     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4677     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4678     >>> str(s)
4679     'привет Ð¼Ð¸Ñ€'
4680     >>> hexenc(bytes(s))
4681     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4682
4683     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4684     Traceback (most recent call last):
4685     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4686
4687     >>> BMPString("ада", bounds=(2, 2))
4688     Traceback (most recent call last):
4689     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4690     >>> s = BMPString("ад", bounds=(2, 2))
4691     >>> s.encoding
4692     'utf-16-be'
4693     >>> hexenc(bytes(s))
4694     '04300434'
4695
4696     .. list-table::
4697        :header-rows: 1
4698
4699        * - Class
4700          - Text Encoding
4701        * - :py:class:`pyderasn.UTF8String`
4702          - utf-8
4703        * - :py:class:`pyderasn.NumericString`
4704          - ascii
4705        * - :py:class:`pyderasn.PrintableString`
4706          - ascii
4707        * - :py:class:`pyderasn.TeletexString`
4708          - ascii
4709        * - :py:class:`pyderasn.T61String`
4710          - ascii
4711        * - :py:class:`pyderasn.VideotexString`
4712          - iso-8859-1
4713        * - :py:class:`pyderasn.IA5String`
4714          - ascii
4715        * - :py:class:`pyderasn.GraphicString`
4716          - iso-8859-1
4717        * - :py:class:`pyderasn.VisibleString`
4718          - ascii
4719        * - :py:class:`pyderasn.ISO646String`
4720          - ascii
4721        * - :py:class:`pyderasn.GeneralString`
4722          - iso-8859-1
4723        * - :py:class:`pyderasn.UniversalString`
4724          - utf-32-be
4725        * - :py:class:`pyderasn.BMPString`
4726          - utf-16-be
4727     """
4728     __slots__ = ()
4729
4730     def _value_sanitize(self, value):
4731         value_raw = None
4732         value_decoded = None
4733         if isinstance(value, self.__class__):
4734             value_raw = value._value
4735         elif value.__class__ == text_type:
4736             value_decoded = value
4737         elif value.__class__ == binary_type:
4738             value_raw = value
4739         else:
4740             raise InvalidValueType((self.__class__, text_type, binary_type))
4741         try:
4742             value_raw = (
4743                 value_decoded.encode(self.encoding)
4744                 if value_raw is None else value_raw
4745             )
4746             value_decoded = (
4747                 value_raw.decode(self.encoding)
4748                 if value_decoded is None else value_decoded
4749             )
4750         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4751             raise DecodeError(str(err))
4752         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4753             raise BoundsError(
4754                 self._bound_min,
4755                 len(value_decoded),
4756                 self._bound_max,
4757             )
4758         return value_raw
4759
4760     def __eq__(self, their):
4761         if their.__class__ == binary_type:
4762             return self._value == their
4763         if their.__class__ == text_type:
4764             return self._value == their.encode(self.encoding)
4765         if not isinstance(their, self.__class__):
4766             return False
4767         return (
4768             self._value == their._value and
4769             self.tag == their.tag and
4770             self._expl == their._expl
4771         )
4772
4773     def __unicode__(self):
4774         if self.ready:
4775             return self._value.decode(self.encoding)
4776         return text_type(self._value)
4777
4778     def __repr__(self):
4779         return pp_console_row(next(self.pps(no_unicode=PY2)))
4780
4781     def pps(self, decode_path=(), no_unicode=False):
4782         value = None
4783         if self.ready:
4784             value = (
4785                 hexenc(bytes(self)) if no_unicode else
4786                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4787             )
4788         yield _pp(
4789             obj=self,
4790             asn1_type_name=self.asn1_type_name,
4791             obj_name=self.__class__.__name__,
4792             decode_path=decode_path,
4793             value=value,
4794             optional=self.optional,
4795             default=self == self.default,
4796             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4797             expl=None if self._expl is None else tag_decode(self._expl),
4798             offset=self.offset,
4799             tlen=self.tlen,
4800             llen=self.llen,
4801             vlen=self.vlen,
4802             expl_offset=self.expl_offset if self.expled else None,
4803             expl_tlen=self.expl_tlen if self.expled else None,
4804             expl_llen=self.expl_llen if self.expled else None,
4805             expl_vlen=self.expl_vlen if self.expled else None,
4806             expl_lenindef=self.expl_lenindef,
4807             ber_encoded=self.ber_encoded,
4808             bered=self.bered,
4809         )
4810         for pp in self.pps_lenindef(decode_path):
4811             yield pp
4812
4813
4814 class UTF8String(CommonString):
4815     __slots__ = ()
4816     tag_default = tag_encode(12)
4817     encoding = "utf-8"
4818     asn1_type_name = "UTF8String"
4819
4820
4821 class AllowableCharsMixin(object):
4822     @property
4823     def allowable_chars(self):
4824         if PY2:
4825             return self._allowable_chars
4826         return frozenset(six_unichr(c) for c in self._allowable_chars)
4827
4828
4829 class NumericString(AllowableCharsMixin, CommonString):
4830     """Numeric string
4831
4832     Its value is properly sanitized: only ASCII digits with spaces can
4833     be stored.
4834
4835     >>> NumericString().allowable_chars
4836     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4837     """
4838     __slots__ = ()
4839     tag_default = tag_encode(18)
4840     encoding = "ascii"
4841     asn1_type_name = "NumericString"
4842     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4843
4844     def _value_sanitize(self, value):
4845         value = super(NumericString, self)._value_sanitize(value)
4846         if not frozenset(value) <= self._allowable_chars:
4847             raise DecodeError("non-numeric value")
4848         return value
4849
4850
4851 PrintableStringState = namedtuple(
4852     "PrintableStringState",
4853     OctetStringState._fields + ("allowable_chars",),
4854     **NAMEDTUPLE_KWARGS
4855 )
4856
4857
4858 class PrintableString(AllowableCharsMixin, CommonString):
4859     """Printable string
4860
4861     Its value is properly sanitized: see X.680 41.4 table 10.
4862
4863     >>> PrintableString().allowable_chars
4864     frozenset([' ', "'", ..., 'z'])
4865     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4866     PrintableString PrintableString foo*bar
4867     >>> obj.allow_asterisk, obj.allow_ampersand
4868     (True, False)
4869     """
4870     __slots__ = ()
4871     tag_default = tag_encode(19)
4872     encoding = "ascii"
4873     asn1_type_name = "PrintableString"
4874     _allowable_chars = frozenset(
4875         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4876     )
4877     _asterisk = frozenset("*".encode("ascii"))
4878     _ampersand = frozenset("&".encode("ascii"))
4879
4880     def __init__(
4881             self,
4882             value=None,
4883             bounds=None,
4884             impl=None,
4885             expl=None,
4886             default=None,
4887             optional=False,
4888             _decoded=(0, 0, 0),
4889             ctx=None,
4890             allow_asterisk=False,
4891             allow_ampersand=False,
4892     ):
4893         """
4894         :param allow_asterisk: allow asterisk character
4895         :param allow_ampersand: allow ampersand character
4896         """
4897         if allow_asterisk:
4898             self._allowable_chars |= self._asterisk
4899         if allow_ampersand:
4900             self._allowable_chars |= self._ampersand
4901         super(PrintableString, self).__init__(
4902             value, bounds, impl, expl, default, optional, _decoded, ctx,
4903         )
4904
4905     @property
4906     def allow_asterisk(self):
4907         """Is asterisk character allowed?
4908         """
4909         return self._asterisk <= self._allowable_chars
4910
4911     @property
4912     def allow_ampersand(self):
4913         """Is ampersand character allowed?
4914         """
4915         return self._ampersand <= self._allowable_chars
4916
4917     def _value_sanitize(self, value):
4918         value = super(PrintableString, self)._value_sanitize(value)
4919         if not frozenset(value) <= self._allowable_chars:
4920             raise DecodeError("non-printable value")
4921         return value
4922
4923     def __getstate__(self):
4924         return PrintableStringState(
4925             *super(PrintableString, self).__getstate__(),
4926             **{"allowable_chars": self._allowable_chars}
4927         )
4928
4929     def __setstate__(self, state):
4930         super(PrintableString, self).__setstate__(state)
4931         self._allowable_chars = state.allowable_chars
4932
4933     def __call__(
4934             self,
4935             value=None,
4936             bounds=None,
4937             impl=None,
4938             expl=None,
4939             default=None,
4940             optional=None,
4941     ):
4942         return self.__class__(
4943             value=value,
4944             bounds=(
4945                 (self._bound_min, self._bound_max)
4946                 if bounds is None else bounds
4947             ),
4948             impl=self.tag if impl is None else impl,
4949             expl=self._expl if expl is None else expl,
4950             default=self.default if default is None else default,
4951             optional=self.optional if optional is None else optional,
4952             allow_asterisk=self.allow_asterisk,
4953             allow_ampersand=self.allow_ampersand,
4954         )
4955
4956
4957 class TeletexString(CommonString):
4958     __slots__ = ()
4959     tag_default = tag_encode(20)
4960     encoding = "ascii"
4961     asn1_type_name = "TeletexString"
4962
4963
4964 class T61String(TeletexString):
4965     __slots__ = ()
4966     asn1_type_name = "T61String"
4967
4968
4969 class VideotexString(CommonString):
4970     __slots__ = ()
4971     tag_default = tag_encode(21)
4972     encoding = "iso-8859-1"
4973     asn1_type_name = "VideotexString"
4974
4975
4976 class IA5String(CommonString):
4977     __slots__ = ()
4978     tag_default = tag_encode(22)
4979     encoding = "ascii"
4980     asn1_type_name = "IA5"
4981
4982
4983 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4984 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4985 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4986 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4987 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4988 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4989
4990
4991 class VisibleString(CommonString):
4992     __slots__ = ()
4993     tag_default = tag_encode(26)
4994     encoding = "ascii"
4995     asn1_type_name = "VisibleString"
4996
4997
4998 UTCTimeState = namedtuple(
4999     "UTCTimeState",
5000     OctetStringState._fields + ("ber_raw",),
5001     **NAMEDTUPLE_KWARGS
5002 )
5003
5004
5005 def str_to_time_fractions(value):
5006     v = pureint(value)
5007     year, v = (v // 10**10), (v % 10**10)
5008     month, v = (v // 10**8), (v % 10**8)
5009     day, v = (v // 10**6), (v % 10**6)
5010     hour, v = (v // 10**4), (v % 10**4)
5011     minute, second = (v // 100), (v % 100)
5012     return year, month, day, hour, minute, second
5013
5014
5015 class UTCTime(VisibleString):
5016     """``UTCTime`` datetime type
5017
5018     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5019     UTCTime UTCTime 2017-09-30T22:07:50
5020     >>> str(t)
5021     '170930220750Z'
5022     >>> bytes(t)
5023     b'170930220750Z'
5024     >>> t.todatetime()
5025     datetime.datetime(2017, 9, 30, 22, 7, 50)
5026     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5027     datetime.datetime(1957, 9, 30, 22, 7, 50)
5028
5029     If BER encoded value was met, then ``ber_raw`` attribute will hold
5030     its raw representation.
5031
5032     .. warning::
5033
5034        Pay attention that UTCTime can not hold full year, so all years
5035        having < 50 years are treated as 20xx, 19xx otherwise, according
5036        to X.509 recommendation.
5037
5038     .. warning::
5039
5040        No strict validation of UTC offsets are made, but very crude:
5041
5042        * minutes are not exceeding 60
5043        * offset value is not exceeding 14 hours
5044     """
5045     __slots__ = ("ber_raw",)
5046     tag_default = tag_encode(23)
5047     encoding = "ascii"
5048     asn1_type_name = "UTCTime"
5049     evgen_mode_skip_value = False
5050
5051     def __init__(
5052             self,
5053             value=None,
5054             impl=None,
5055             expl=None,
5056             default=None,
5057             optional=False,
5058             _decoded=(0, 0, 0),
5059             bounds=None,  # dummy argument, workability for OctetString.decode
5060             ctx=None,
5061     ):
5062         """
5063         :param value: set the value. Either datetime type, or
5064                       :py:class:`pyderasn.UTCTime` object
5065         :param bytes impl: override default tag with ``IMPLICIT`` one
5066         :param bytes expl: override default tag with ``EXPLICIT`` one
5067         :param default: set default value. Type same as in ``value``
5068         :param bool optional: is object ``OPTIONAL`` in sequence
5069         """
5070         super(UTCTime, self).__init__(
5071             None, None, impl, expl, None, optional, _decoded, ctx,
5072         )
5073         self._value = value
5074         self.ber_raw = None
5075         if value is not None:
5076             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5077             self.ber_encoded = self.ber_raw is not None
5078         if default is not None:
5079             default, _ = self._value_sanitize(default)
5080             self.default = self.__class__(
5081                 value=default,
5082                 impl=self.tag,
5083                 expl=self._expl,
5084             )
5085             if self._value is None:
5086                 self._value = default
5087             optional = True
5088         self.optional = optional
5089
5090     def _strptime_bered(self, value):
5091         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5092         value = value[10:]
5093         if len(value) == 0:
5094             raise ValueError("no timezone")
5095         year += 2000 if year < 50 else 1900
5096         decoded = datetime(year, month, day, hour, minute)
5097         offset = 0
5098         if value[-1] == "Z":
5099             value = value[:-1]
5100         else:
5101             if len(value) < 5:
5102                 raise ValueError("invalid UTC offset")
5103             if value[-5] == "-":
5104                 sign = -1
5105             elif value[-5] == "+":
5106                 sign = 1
5107             else:
5108                 raise ValueError("invalid UTC offset")
5109             v = pureint(value[-4:])
5110             offset, v = (60 * (v % 100)), v // 100
5111             if offset >= 3600:
5112                 raise ValueError("invalid UTC offset minutes")
5113             offset += 3600 * v
5114             if offset > 14 * 3600:
5115                 raise ValueError("too big UTC offset")
5116             offset *= sign
5117             value = value[:-5]
5118         if len(value) == 0:
5119             return offset, decoded
5120         if len(value) != 2:
5121             raise ValueError("invalid UTC offset seconds")
5122         seconds = pureint(value)
5123         if seconds >= 60:
5124             raise ValueError("invalid seconds value")
5125         return offset, decoded + timedelta(seconds=seconds)
5126
5127     def _strptime(self, value):
5128         # datetime.strptime's format: %y%m%d%H%M%SZ
5129         if len(value) != LEN_YYMMDDHHMMSSZ:
5130             raise ValueError("invalid UTCTime length")
5131         if value[-1] != "Z":
5132             raise ValueError("non UTC timezone")
5133         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5134         year += 2000 if year < 50 else 1900
5135         return datetime(year, month, day, hour, minute, second)
5136
5137     def _dt_sanitize(self, value):
5138         if value.year < 1950 or value.year > 2049:
5139             raise ValueError("UTCTime can hold only 1950-2049 years")
5140         return value.replace(microsecond=0)
5141
5142     def _value_sanitize(self, value, ctx=None):
5143         if value.__class__ == binary_type:
5144             try:
5145                 value_decoded = value.decode("ascii")
5146             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5147                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5148             err = None
5149             try:
5150                 return self._strptime(value_decoded), None
5151             except (TypeError, ValueError) as _err:
5152                 err = _err
5153                 if (ctx is not None) and ctx.get("bered", False):
5154                     try:
5155                         offset, _value = self._strptime_bered(value_decoded)
5156                         _value = _value - timedelta(seconds=offset)
5157                         return self._dt_sanitize(_value), value
5158                     except (TypeError, ValueError, OverflowError) as _err:
5159                         err = _err
5160             raise DecodeError(
5161                 "invalid %s format: %r" % (self.asn1_type_name, err),
5162                 klass=self.__class__,
5163             )
5164         if isinstance(value, self.__class__):
5165             return value._value, None
5166         if value.__class__ == datetime:
5167             return self._dt_sanitize(value), None
5168         raise InvalidValueType((self.__class__, datetime))
5169
5170     def _pp_value(self):
5171         if self.ready:
5172             value = self._value.isoformat()
5173             if self.ber_encoded:
5174                 value += " (%s)" % self.ber_raw
5175             return value
5176         return None
5177
5178     def __unicode__(self):
5179         if self.ready:
5180             value = self._value.isoformat()
5181             if self.ber_encoded:
5182                 value += " (%s)" % self.ber_raw
5183             return value
5184         return text_type(self._pp_value())
5185
5186     def __getstate__(self):
5187         return UTCTimeState(
5188             *super(UTCTime, self).__getstate__(),
5189             **{"ber_raw": self.ber_raw}
5190         )
5191
5192     def __setstate__(self, state):
5193         super(UTCTime, self).__setstate__(state)
5194         self.ber_raw = state.ber_raw
5195
5196     def __bytes__(self):
5197         self._assert_ready()
5198         return self._encode_time()
5199
5200     def __eq__(self, their):
5201         if their.__class__ == binary_type:
5202             return self._encode_time() == their
5203         if their.__class__ == datetime:
5204             return self.todatetime() == their
5205         if not isinstance(their, self.__class__):
5206             return False
5207         return (
5208             self._value == their._value and
5209             self.tag == their.tag and
5210             self._expl == their._expl
5211         )
5212
5213     def _encode_time(self):
5214         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5215
5216     def _encode(self):
5217         self._assert_ready()
5218         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5219
5220     def _encode1st(self, state):
5221         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5222
5223     def _encode2nd(self, writer, state_iter):
5224         self._assert_ready()
5225         write_full(writer, self._encode())
5226
5227     def _encode_cer(self, writer):
5228         write_full(writer, self._encode())
5229
5230     def todatetime(self):
5231         return self._value
5232
5233     def __repr__(self):
5234         return pp_console_row(next(self.pps()))
5235
5236     def pps(self, decode_path=()):
5237         yield _pp(
5238             obj=self,
5239             asn1_type_name=self.asn1_type_name,
5240             obj_name=self.__class__.__name__,
5241             decode_path=decode_path,
5242             value=self._pp_value(),
5243             optional=self.optional,
5244             default=self == self.default,
5245             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5246             expl=None if self._expl is None else tag_decode(self._expl),
5247             offset=self.offset,
5248             tlen=self.tlen,
5249             llen=self.llen,
5250             vlen=self.vlen,
5251             expl_offset=self.expl_offset if self.expled else None,
5252             expl_tlen=self.expl_tlen if self.expled else None,
5253             expl_llen=self.expl_llen if self.expled else None,
5254             expl_vlen=self.expl_vlen if self.expled else None,
5255             expl_lenindef=self.expl_lenindef,
5256             ber_encoded=self.ber_encoded,
5257             bered=self.bered,
5258         )
5259         for pp in self.pps_lenindef(decode_path):
5260             yield pp
5261
5262
5263 class GeneralizedTime(UTCTime):
5264     """``GeneralizedTime`` datetime type
5265
5266     This type is similar to :py:class:`pyderasn.UTCTime`.
5267
5268     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5269     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5270     >>> str(t)
5271     '20170930220750.000123Z'
5272     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5273     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5274
5275     .. warning::
5276
5277        Only microsecond fractions are supported in DER encoding.
5278        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5279        higher precision values.
5280
5281     .. warning::
5282
5283        BER encoded data can loss information (accuracy) during decoding
5284        because of float transformations.
5285
5286     .. warning::
5287
5288        Local times (without explicit timezone specification) are treated
5289        as UTC one, no transformations are made.
5290
5291     .. warning::
5292
5293        Zero year is unsupported.
5294     """
5295     __slots__ = ()
5296     tag_default = tag_encode(24)
5297     asn1_type_name = "GeneralizedTime"
5298
5299     def _dt_sanitize(self, value):
5300         return value
5301
5302     def _strptime_bered(self, value):
5303         if len(value) < 4 + 3 * 2:
5304             raise ValueError("invalid GeneralizedTime")
5305         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5306         decoded = datetime(year, month, day, hour)
5307         offset, value = 0, value[10:]
5308         if len(value) == 0:
5309             return offset, decoded
5310         if value[-1] == "Z":
5311             value = value[:-1]
5312         else:
5313             for char, sign in (("-", -1), ("+", 1)):
5314                 idx = value.rfind(char)
5315                 if idx == -1:
5316                     continue
5317                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5318                 v = pureint(offset_raw)
5319                 if len(offset_raw) == 4:
5320                     offset, v = (60 * (v % 100)), v // 100
5321                     if offset >= 3600:
5322                         raise ValueError("invalid UTC offset minutes")
5323                 elif len(offset_raw) == 2:
5324                     pass
5325                 else:
5326                     raise ValueError("invalid UTC offset")
5327                 offset += 3600 * v
5328                 if offset > 14 * 3600:
5329                     raise ValueError("too big UTC offset")
5330                 offset *= sign
5331                 break
5332         if len(value) == 0:
5333             return offset, decoded
5334         if value[0] in DECIMAL_SIGNS:
5335             return offset, (
5336                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5337             )
5338         if len(value) < 2:
5339             raise ValueError("stripped minutes")
5340         decoded += timedelta(seconds=60 * pureint(value[:2]))
5341         value = value[2:]
5342         if len(value) == 0:
5343             return offset, decoded
5344         if value[0] in DECIMAL_SIGNS:
5345             return offset, (
5346                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5347             )
5348         if len(value) < 2:
5349             raise ValueError("stripped seconds")
5350         decoded += timedelta(seconds=pureint(value[:2]))
5351         value = value[2:]
5352         if len(value) == 0:
5353             return offset, decoded
5354         if value[0] not in DECIMAL_SIGNS:
5355             raise ValueError("invalid format after seconds")
5356         return offset, (
5357             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5358         )
5359
5360     def _strptime(self, value):
5361         l = len(value)
5362         if l == LEN_YYYYMMDDHHMMSSZ:
5363             # datetime.strptime's format: %Y%m%d%H%M%SZ
5364             if value[-1] != "Z":
5365                 raise ValueError("non UTC timezone")
5366             return datetime(*str_to_time_fractions(value[:-1]))
5367         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5368             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5369             if value[-1] != "Z":
5370                 raise ValueError("non UTC timezone")
5371             if value[14] != ".":
5372                 raise ValueError("no fractions separator")
5373             us = value[15:-1]
5374             if us[-1] == "0":
5375                 raise ValueError("trailing zero")
5376             us_len = len(us)
5377             if us_len > 6:
5378                 raise ValueError("only microsecond fractions are supported")
5379             us = pureint(us + ("0" * (6 - us_len)))
5380             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5381             return datetime(year, month, day, hour, minute, second, us)
5382         raise ValueError("invalid GeneralizedTime length")
5383
5384     def _encode_time(self):
5385         value = self._value
5386         encoded = value.strftime("%Y%m%d%H%M%S")
5387         if value.microsecond > 0:
5388             encoded += (".%06d" % value.microsecond).rstrip("0")
5389         return (encoded + "Z").encode("ascii")
5390
5391     def _encode(self):
5392         self._assert_ready()
5393         value = self._value
5394         if value.microsecond > 0:
5395             encoded = self._encode_time()
5396             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5397         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5398
5399     def _encode1st(self, state):
5400         self._assert_ready()
5401         vlen = len(self._encode_time())
5402         return len(self.tag) + len_size(vlen) + vlen, state
5403
5404     def _encode2nd(self, writer, state_iter):
5405         write_full(writer, self._encode())
5406
5407
5408 class GraphicString(CommonString):
5409     __slots__ = ()
5410     tag_default = tag_encode(25)
5411     encoding = "iso-8859-1"
5412     asn1_type_name = "GraphicString"
5413
5414
5415 class ISO646String(VisibleString):
5416     __slots__ = ()
5417     asn1_type_name = "ISO646String"
5418
5419
5420 class GeneralString(CommonString):
5421     __slots__ = ()
5422     tag_default = tag_encode(27)
5423     encoding = "iso-8859-1"
5424     asn1_type_name = "GeneralString"
5425
5426
5427 class UniversalString(CommonString):
5428     __slots__ = ()
5429     tag_default = tag_encode(28)
5430     encoding = "utf-32-be"
5431     asn1_type_name = "UniversalString"
5432
5433
5434 class BMPString(CommonString):
5435     __slots__ = ()
5436     tag_default = tag_encode(30)
5437     encoding = "utf-16-be"
5438     asn1_type_name = "BMPString"
5439
5440
5441 ChoiceState = namedtuple(
5442     "ChoiceState",
5443     BasicState._fields + ("specs", "value",),
5444     **NAMEDTUPLE_KWARGS
5445 )
5446
5447
5448 class Choice(Obj):
5449     """``CHOICE`` special type
5450
5451     ::
5452
5453         class GeneralName(Choice):
5454             schema = (
5455                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5456                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5457             )
5458
5459     >>> gn = GeneralName()
5460     GeneralName CHOICE
5461     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5462     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5463     >>> gn["dNSName"] = IA5String("bar.baz")
5464     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5465     >>> gn["rfc822Name"]
5466     None
5467     >>> gn["dNSName"]
5468     [2] IA5String IA5 bar.baz
5469     >>> gn.choice
5470     'dNSName'
5471     >>> gn.value == gn["dNSName"]
5472     True
5473     >>> gn.specs
5474     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5475
5476     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5477     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5478     """
5479     __slots__ = ("specs",)
5480     tag_default = None
5481     asn1_type_name = "CHOICE"
5482
5483     def __init__(
5484             self,
5485             value=None,
5486             schema=None,
5487             impl=None,
5488             expl=None,
5489             default=None,
5490             optional=False,
5491             _decoded=(0, 0, 0),
5492     ):
5493         """
5494         :param value: set the value. Either ``(choice, value)`` tuple, or
5495                       :py:class:`pyderasn.Choice` object
5496         :param bytes impl: can not be set, do **not** use it
5497         :param bytes expl: override default tag with ``EXPLICIT`` one
5498         :param default: set default value. Type same as in ``value``
5499         :param bool optional: is object ``OPTIONAL`` in sequence
5500         """
5501         if impl is not None:
5502             raise ValueError("no implicit tag allowed for CHOICE")
5503         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5504         if schema is None:
5505             schema = getattr(self, "schema", ())
5506         if len(schema) == 0:
5507             raise ValueError("schema must be specified")
5508         self.specs = (
5509             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5510         )
5511         self._value = None
5512         if value is not None:
5513             self._value = self._value_sanitize(value)
5514         if default is not None:
5515             default_value = self._value_sanitize(default)
5516             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5517             default_obj.specs = self.specs
5518             default_obj._value = default_value
5519             self.default = default_obj
5520             if value is None:
5521                 self._value = copy(default_obj._value)
5522         if self._expl is not None:
5523             tag_class, _, tag_num = tag_decode(self._expl)
5524             self._tag_order = (tag_class, tag_num)
5525
5526     def _value_sanitize(self, value):
5527         if (value.__class__ == tuple) and len(value) == 2:
5528             choice, obj = value
5529             spec = self.specs.get(choice)
5530             if spec is None:
5531                 raise ObjUnknown(choice)
5532             if not isinstance(obj, spec.__class__):
5533                 raise InvalidValueType((spec,))
5534             return (choice, spec(obj))
5535         if isinstance(value, self.__class__):
5536             return value._value
5537         raise InvalidValueType((self.__class__, tuple))
5538
5539     @property
5540     def ready(self):
5541         return self._value is not None and self._value[1].ready
5542
5543     @property
5544     def bered(self):
5545         return self.expl_lenindef or (
5546             (self._value is not None) and
5547             self._value[1].bered
5548         )
5549
5550     def __getstate__(self):
5551         return ChoiceState(
5552             __version__,
5553             self.tag,
5554             self._tag_order,
5555             self._expl,
5556             self.default,
5557             self.optional,
5558             self.offset,
5559             self.llen,
5560             self.vlen,
5561             self.expl_lenindef,
5562             self.lenindef,
5563             self.ber_encoded,
5564             self.specs,
5565             copy(self._value),
5566         )
5567
5568     def __setstate__(self, state):
5569         super(Choice, self).__setstate__(state)
5570         self.specs = state.specs
5571         self._value = state.value
5572
5573     def __eq__(self, their):
5574         if (their.__class__ == tuple) and len(their) == 2:
5575             return self._value == their
5576         if not isinstance(their, self.__class__):
5577             return False
5578         return (
5579             self.specs == their.specs and
5580             self._value == their._value
5581         )
5582
5583     def __call__(
5584             self,
5585             value=None,
5586             expl=None,
5587             default=None,
5588             optional=None,
5589     ):
5590         return self.__class__(
5591             value=value,
5592             schema=self.specs,
5593             expl=self._expl if expl is None else expl,
5594             default=self.default if default is None else default,
5595             optional=self.optional if optional is None else optional,
5596         )
5597
5598     @property
5599     def choice(self):
5600         """Name of the choice
5601         """
5602         self._assert_ready()
5603         return self._value[0]
5604
5605     @property
5606     def value(self):
5607         """Value of underlying choice
5608         """
5609         self._assert_ready()
5610         return self._value[1]
5611
5612     @property
5613     def tag_order(self):
5614         self._assert_ready()
5615         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5616
5617     @property
5618     def tag_order_cer(self):
5619         return min(v.tag_order_cer for v in itervalues(self.specs))
5620
5621     def __getitem__(self, key):
5622         if key not in self.specs:
5623             raise ObjUnknown(key)
5624         if self._value is None:
5625             return None
5626         choice, value = self._value
5627         if choice != key:
5628             return None
5629         return value
5630
5631     def __setitem__(self, key, value):
5632         spec = self.specs.get(key)
5633         if spec is None:
5634             raise ObjUnknown(key)
5635         if not isinstance(value, spec.__class__):
5636             raise InvalidValueType((spec.__class__,))
5637         self._value = (key, spec(value))
5638
5639     @property
5640     def tlen(self):
5641         return 0
5642
5643     @property
5644     def decoded(self):
5645         return self._value[1].decoded if self.ready else False
5646
5647     def _encode(self):
5648         self._assert_ready()
5649         return self._value[1].encode()
5650
5651     def _encode1st(self, state):
5652         self._assert_ready()
5653         return self._value[1].encode1st(state)
5654
5655     def _encode2nd(self, writer, state_iter):
5656         self._value[1].encode2nd(writer, state_iter)
5657
5658     def _encode_cer(self, writer):
5659         self._assert_ready()
5660         self._value[1].encode_cer(writer)
5661
5662     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5663         for choice, spec in iteritems(self.specs):
5664             sub_decode_path = decode_path + (choice,)
5665             try:
5666                 spec.decode(
5667                     tlv,
5668                     offset=offset,
5669                     leavemm=True,
5670                     decode_path=sub_decode_path,
5671                     ctx=ctx,
5672                     tag_only=True,
5673                     _ctx_immutable=False,
5674                 )
5675             except TagMismatch:
5676                 continue
5677             break
5678         else:
5679             raise TagMismatch(
5680                 klass=self.__class__,
5681                 decode_path=decode_path,
5682                 offset=offset,
5683             )
5684         if tag_only:  # pragma: no cover
5685             yield None
5686             return
5687         if evgen_mode:
5688             for _decode_path, value, tail in spec.decode_evgen(
5689                     tlv,
5690                     offset=offset,
5691                     leavemm=True,
5692                     decode_path=sub_decode_path,
5693                     ctx=ctx,
5694                     _ctx_immutable=False,
5695             ):
5696                 yield _decode_path, value, tail
5697         else:
5698             _, value, tail = next(spec.decode_evgen(
5699                 tlv,
5700                 offset=offset,
5701                 leavemm=True,
5702                 decode_path=sub_decode_path,
5703                 ctx=ctx,
5704                 _ctx_immutable=False,
5705                 _evgen_mode=False,
5706             ))
5707         obj = self.__class__(
5708             schema=self.specs,
5709             expl=self._expl,
5710             default=self.default,
5711             optional=self.optional,
5712             _decoded=(offset, 0, value.fulllen),
5713         )
5714         obj._value = (choice, value)
5715         yield decode_path, obj, tail
5716
5717     def __repr__(self):
5718         value = pp_console_row(next(self.pps()))
5719         if self.ready:
5720             value = "%s[%r]" % (value, self.value)
5721         return value
5722
5723     def pps(self, decode_path=()):
5724         yield _pp(
5725             obj=self,
5726             asn1_type_name=self.asn1_type_name,
5727             obj_name=self.__class__.__name__,
5728             decode_path=decode_path,
5729             value=self.choice if self.ready else None,
5730             optional=self.optional,
5731             default=self == self.default,
5732             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5733             expl=None if self._expl is None else tag_decode(self._expl),
5734             offset=self.offset,
5735             tlen=self.tlen,
5736             llen=self.llen,
5737             vlen=self.vlen,
5738             expl_lenindef=self.expl_lenindef,
5739             bered=self.bered,
5740         )
5741         if self.ready:
5742             yield self.value.pps(decode_path=decode_path + (self.choice,))
5743         for pp in self.pps_lenindef(decode_path):
5744             yield pp
5745
5746
5747 class PrimitiveTypes(Choice):
5748     """Predefined ``CHOICE`` for all generic primitive types
5749
5750     It could be useful for general decoding of some unspecified values:
5751
5752     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5753     OCTET STRING 3 bytes 666f6f
5754     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5755     INTEGER 1193046
5756     """
5757     __slots__ = ()
5758     schema = tuple((klass.__name__, klass()) for klass in (
5759         Boolean,
5760         Integer,
5761         BitString,
5762         OctetString,
5763         Null,
5764         ObjectIdentifier,
5765         UTF8String,
5766         NumericString,
5767         PrintableString,
5768         TeletexString,
5769         VideotexString,
5770         IA5String,
5771         UTCTime,
5772         GeneralizedTime,
5773         GraphicString,
5774         VisibleString,
5775         ISO646String,
5776         GeneralString,
5777         UniversalString,
5778         BMPString,
5779     ))
5780
5781
5782 AnyState = namedtuple(
5783     "AnyState",
5784     BasicState._fields + ("value", "defined"),
5785     **NAMEDTUPLE_KWARGS
5786 )
5787
5788
5789 class Any(Obj):
5790     """``ANY`` special type
5791
5792     >>> Any(Integer(-123))
5793     ANY INTEGER -123 (0X:7B)
5794     >>> a = Any(OctetString(b"hello world").encode())
5795     ANY 040b68656c6c6f20776f726c64
5796     >>> hexenc(bytes(a))
5797     b'0x040x0bhello world'
5798     """
5799     __slots__ = ("defined",)
5800     tag_default = tag_encode(0)
5801     asn1_type_name = "ANY"
5802
5803     def __init__(
5804             self,
5805             value=None,
5806             expl=None,
5807             optional=False,
5808             _decoded=(0, 0, 0),
5809     ):
5810         """
5811         :param value: set the value. Either any kind of pyderasn's
5812                       **ready** object, or bytes. Pay attention that
5813                       **no** validation is performed if raw binary value
5814                       is valid TLV, except just tag decoding
5815         :param bytes expl: override default tag with ``EXPLICIT`` one
5816         :param bool optional: is object ``OPTIONAL`` in sequence
5817         """
5818         super(Any, self).__init__(None, expl, None, optional, _decoded)
5819         if value is None:
5820             self._value = None
5821         else:
5822             value = self._value_sanitize(value)
5823             self._value = value
5824             if self._expl is None:
5825                 if value.__class__ == binary_type:
5826                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5827                 else:
5828                     tag_class, tag_num = value.tag_order
5829             else:
5830                 tag_class, _, tag_num = tag_decode(self._expl)
5831             self._tag_order = (tag_class, tag_num)
5832         self.defined = None
5833
5834     def _value_sanitize(self, value):
5835         if value.__class__ == binary_type:
5836             if len(value) == 0:
5837                 raise ValueError("Any value can not be empty")
5838             return value
5839         if isinstance(value, self.__class__):
5840             return value._value
5841         if not isinstance(value, Obj):
5842             raise InvalidValueType((self.__class__, Obj, binary_type))
5843         return value
5844
5845     @property
5846     def ready(self):
5847         return self._value is not None
5848
5849     @property
5850     def tag_order(self):
5851         self._assert_ready()
5852         return self._tag_order
5853
5854     @property
5855     def bered(self):
5856         if self.expl_lenindef or self.lenindef:
5857             return True
5858         if self.defined is None:
5859             return False
5860         return self.defined[1].bered
5861
5862     def __getstate__(self):
5863         return AnyState(
5864             __version__,
5865             self.tag,
5866             self._tag_order,
5867             self._expl,
5868             None,
5869             self.optional,
5870             self.offset,
5871             self.llen,
5872             self.vlen,
5873             self.expl_lenindef,
5874             self.lenindef,
5875             self.ber_encoded,
5876             self._value,
5877             self.defined,
5878         )
5879
5880     def __setstate__(self, state):
5881         super(Any, self).__setstate__(state)
5882         self._value = state.value
5883         self.defined = state.defined
5884
5885     def __eq__(self, their):
5886         if their.__class__ == binary_type:
5887             if self._value.__class__ == binary_type:
5888                 return self._value == their
5889             return self._value.encode() == their
5890         if issubclass(their.__class__, Any):
5891             if self.ready and their.ready:
5892                 return bytes(self) == bytes(their)
5893             return self.ready == their.ready
5894         return False
5895
5896     def __call__(
5897             self,
5898             value=None,
5899             expl=None,
5900             optional=None,
5901     ):
5902         return self.__class__(
5903             value=value,
5904             expl=self._expl if expl is None else expl,
5905             optional=self.optional if optional is None else optional,
5906         )
5907
5908     def __bytes__(self):
5909         self._assert_ready()
5910         value = self._value
5911         if value.__class__ == binary_type:
5912             return value
5913         return self._value.encode()
5914
5915     @property
5916     def tlen(self):
5917         return 0
5918
5919     def _encode(self):
5920         self._assert_ready()
5921         value = self._value
5922         if value.__class__ == binary_type:
5923             return value
5924         return value.encode()
5925
5926     def _encode1st(self, state):
5927         self._assert_ready()
5928         value = self._value
5929         if value.__class__ == binary_type:
5930             return len(value), state
5931         return value.encode1st(state)
5932
5933     def _encode2nd(self, writer, state_iter):
5934         value = self._value
5935         if value.__class__ == binary_type:
5936             write_full(writer, value)
5937         else:
5938             value.encode2nd(writer, state_iter)
5939
5940     def _encode_cer(self, writer):
5941         self._assert_ready()
5942         value = self._value
5943         if value.__class__ == binary_type:
5944             write_full(writer, value)
5945         else:
5946             value.encode_cer(writer)
5947
5948     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5949         try:
5950             t, tlen, lv = tag_strip(tlv)
5951         except DecodeError as err:
5952             raise err.__class__(
5953                 msg=err.msg,
5954                 klass=self.__class__,
5955                 decode_path=decode_path,
5956                 offset=offset,
5957             )
5958         try:
5959             l, llen, v = len_decode(lv)
5960         except LenIndefForm as err:
5961             if not ctx.get("bered", False):
5962                 raise err.__class__(
5963                     msg=err.msg,
5964                     klass=self.__class__,
5965                     decode_path=decode_path,
5966                     offset=offset,
5967                 )
5968             llen, vlen, v = 1, 0, lv[1:]
5969             sub_offset = offset + tlen + llen
5970             chunk_i = 0
5971             while v[:EOC_LEN].tobytes() != EOC:
5972                 chunk, v = Any().decode(
5973                     v,
5974                     offset=sub_offset,
5975                     decode_path=decode_path + (str(chunk_i),),
5976                     leavemm=True,
5977                     ctx=ctx,
5978                     _ctx_immutable=False,
5979                 )
5980                 vlen += chunk.tlvlen
5981                 sub_offset += chunk.tlvlen
5982                 chunk_i += 1
5983             tlvlen = tlen + llen + vlen + EOC_LEN
5984             obj = self.__class__(
5985                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5986                 expl=self._expl,
5987                 optional=self.optional,
5988                 _decoded=(offset, 0, tlvlen),
5989             )
5990             obj.lenindef = True
5991             obj.tag = t.tobytes()
5992             yield decode_path, obj, v[EOC_LEN:]
5993             return
5994         except DecodeError as err:
5995             raise err.__class__(
5996                 msg=err.msg,
5997                 klass=self.__class__,
5998                 decode_path=decode_path,
5999                 offset=offset,
6000             )
6001         if l > len(v):
6002             raise NotEnoughData(
6003                 "encoded length is longer than data",
6004                 klass=self.__class__,
6005                 decode_path=decode_path,
6006                 offset=offset,
6007             )
6008         tlvlen = tlen + llen + l
6009         v, tail = tlv[:tlvlen], v[l:]
6010         obj = self.__class__(
6011             value=None if evgen_mode else v.tobytes(),
6012             expl=self._expl,
6013             optional=self.optional,
6014             _decoded=(offset, 0, tlvlen),
6015         )
6016         obj.tag = t.tobytes()
6017         yield decode_path, obj, tail
6018
6019     def __repr__(self):
6020         return pp_console_row(next(self.pps()))
6021
6022     def pps(self, decode_path=()):
6023         value = self._value
6024         if value is None:
6025             pass
6026         elif value.__class__ == binary_type:
6027             value = None
6028         else:
6029             value = repr(value)
6030         yield _pp(
6031             obj=self,
6032             asn1_type_name=self.asn1_type_name,
6033             obj_name=self.__class__.__name__,
6034             decode_path=decode_path,
6035             value=value,
6036             blob=self._value if self._value.__class__ == binary_type else None,
6037             optional=self.optional,
6038             default=self == self.default,
6039             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6040             expl=None if self._expl is None else tag_decode(self._expl),
6041             offset=self.offset,
6042             tlen=self.tlen,
6043             llen=self.llen,
6044             vlen=self.vlen,
6045             expl_offset=self.expl_offset if self.expled else None,
6046             expl_tlen=self.expl_tlen if self.expled else None,
6047             expl_llen=self.expl_llen if self.expled else None,
6048             expl_vlen=self.expl_vlen if self.expled else None,
6049             expl_lenindef=self.expl_lenindef,
6050             lenindef=self.lenindef,
6051             bered=self.bered,
6052         )
6053         defined_by, defined = self.defined or (None, None)
6054         if defined_by is not None:
6055             yield defined.pps(
6056                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6057             )
6058         for pp in self.pps_lenindef(decode_path):
6059             yield pp
6060
6061
6062 ########################################################################
6063 # ASN.1 constructed types
6064 ########################################################################
6065
6066 def abs_decode_path(decode_path, rel_path):
6067     """Create an absolute decode path from current and relative ones
6068
6069     :param decode_path: current decode path, starting point. Tuple of strings
6070     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6071                      If first tuple's element is "/", then treat it as
6072                      an absolute path, ignoring ``decode_path`` as
6073                      starting point. Also this tuple can contain ".."
6074                      elements, stripping the leading element from
6075                      ``decode_path``
6076
6077     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6078     ("foo", "bar", "baz", "whatever")
6079     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6080     ("foo", "whatever")
6081     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6082     ("baz", "whatever")
6083     """
6084     if rel_path[0] == "/":
6085         return rel_path[1:]
6086     if rel_path[0] == "..":
6087         return abs_decode_path(decode_path[:-1], rel_path[1:])
6088     return decode_path + rel_path
6089
6090
6091 SequenceState = namedtuple(
6092     "SequenceState",
6093     BasicState._fields + ("specs", "value",),
6094     **NAMEDTUPLE_KWARGS
6095 )
6096
6097
6098 class SequenceEncode1stMixing(object):
6099     def _encode1st(self, state):
6100         state.append(0)
6101         idx = len(state) - 1
6102         vlen = 0
6103         for v in self._values_for_encoding():
6104             l, _ = v.encode1st(state)
6105             vlen += l
6106         state[idx] = vlen
6107         return len(self.tag) + len_size(vlen) + vlen, state
6108
6109
6110 class Sequence(SequenceEncode1stMixing, Obj):
6111     """``SEQUENCE`` structure type
6112
6113     You have to make specification of sequence::
6114
6115         class Extension(Sequence):
6116             schema = (
6117                 ("extnID", ObjectIdentifier()),
6118                 ("critical", Boolean(default=False)),
6119                 ("extnValue", OctetString()),
6120             )
6121
6122     Then, you can work with it as with dictionary.
6123
6124     >>> ext = Extension()
6125     >>> Extension().specs
6126     OrderedDict([
6127         ('extnID', OBJECT IDENTIFIER),
6128         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6129         ('extnValue', OCTET STRING),
6130     ])
6131     >>> ext["extnID"] = "1.2.3"
6132     Traceback (most recent call last):
6133     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6134     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6135
6136     You can determine if sequence is ready to be encoded:
6137
6138     >>> ext.ready
6139     False
6140     >>> ext.encode()
6141     Traceback (most recent call last):
6142     pyderasn.ObjNotReady: object is not ready: extnValue
6143     >>> ext["extnValue"] = OctetString(b"foobar")
6144     >>> ext.ready
6145     True
6146
6147     Value you want to assign, must have the same **type** as in
6148     corresponding specification, but it can have different tags,
6149     optional/default attributes -- they will be taken from specification
6150     automatically::
6151
6152         class TBSCertificate(Sequence):
6153             schema = (
6154                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6155             [...]
6156
6157     >>> tbs = TBSCertificate()
6158     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6159
6160     Assign ``None`` to remove value from sequence.
6161
6162     You can set values in Sequence during its initialization:
6163
6164     >>> AlgorithmIdentifier((
6165         ("algorithm", ObjectIdentifier("1.2.3")),
6166         ("parameters", Any(Null()))
6167     ))
6168     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6169
6170     You can determine if value exists/set in the sequence and take its value:
6171
6172     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6173     (True, True, False)
6174     >>> ext["extnID"]
6175     OBJECT IDENTIFIER 1.2.3
6176
6177     But pay attention that if value has default, then it won't be (not
6178     in) in the sequence (because ``DEFAULT`` must not be encoded in
6179     DER), but you can read its value:
6180
6181     >>> "critical" in ext, ext["critical"]
6182     (False, BOOLEAN False)
6183     >>> ext["critical"] = Boolean(True)
6184     >>> "critical" in ext, ext["critical"]
6185     (True, BOOLEAN True)
6186
6187     All defaulted values are always optional.
6188
6189     .. _allow_default_values_ctx:
6190
6191     DER prohibits default value encoding and will raise an error if
6192     default value is unexpectedly met during decode.
6193     If :ref:`bered <bered_ctx>` context option is set, then no error
6194     will be raised, but ``bered`` attribute set. You can disable strict
6195     defaulted values existence validation by setting
6196     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6197
6198     .. warning::
6199
6200        Check for default value existence is not performed in
6201        ``evgen_mode``, because previously decoded values are not stored
6202        in memory, to be able to compare them.
6203
6204     Two sequences are equal if they have equal specification (schema),
6205     implicit/explicit tagging and the same values.
6206     """
6207     __slots__ = ("specs",)
6208     tag_default = tag_encode(form=TagFormConstructed, num=16)
6209     asn1_type_name = "SEQUENCE"
6210
6211     def __init__(
6212             self,
6213             value=None,
6214             schema=None,
6215             impl=None,
6216             expl=None,
6217             default=None,
6218             optional=False,
6219             _decoded=(0, 0, 0),
6220     ):
6221         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6222         if schema is None:
6223             schema = getattr(self, "schema", ())
6224         self.specs = (
6225             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6226         )
6227         self._value = {}
6228         if value is not None:
6229             if issubclass(value.__class__, Sequence):
6230                 self._value = value._value
6231             elif hasattr(value, "__iter__"):
6232                 for seq_key, seq_value in value:
6233                     self[seq_key] = seq_value
6234             else:
6235                 raise InvalidValueType((Sequence,))
6236         if default is not None:
6237             if not issubclass(default.__class__, Sequence):
6238                 raise InvalidValueType((Sequence,))
6239             default_value = default._value
6240             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6241             default_obj.specs = self.specs
6242             default_obj._value = default_value
6243             self.default = default_obj
6244             if value is None:
6245                 self._value = copy(default_obj._value)
6246
6247     @property
6248     def ready(self):
6249         for name, spec in iteritems(self.specs):
6250             value = self._value.get(name)
6251             if value is None:
6252                 if spec.optional:
6253                     continue
6254                 return False
6255             if not value.ready:
6256                 return False
6257         return True
6258
6259     @property
6260     def bered(self):
6261         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6262             return True
6263         return any(value.bered for value in itervalues(self._value))
6264
6265     def __getstate__(self):
6266         return SequenceState(
6267             __version__,
6268             self.tag,
6269             self._tag_order,
6270             self._expl,
6271             self.default,
6272             self.optional,
6273             self.offset,
6274             self.llen,
6275             self.vlen,
6276             self.expl_lenindef,
6277             self.lenindef,
6278             self.ber_encoded,
6279             self.specs,
6280             {k: copy(v) for k, v in iteritems(self._value)},
6281         )
6282
6283     def __setstate__(self, state):
6284         super(Sequence, self).__setstate__(state)
6285         self.specs = state.specs
6286         self._value = state.value
6287
6288     def __eq__(self, their):
6289         if not isinstance(their, self.__class__):
6290             return False
6291         return (
6292             self.specs == their.specs and
6293             self.tag == their.tag and
6294             self._expl == their._expl and
6295             self._value == their._value
6296         )
6297
6298     def __call__(
6299             self,
6300             value=None,
6301             impl=None,
6302             expl=None,
6303             default=None,
6304             optional=None,
6305     ):
6306         return self.__class__(
6307             value=value,
6308             schema=self.specs,
6309             impl=self.tag if impl is None else impl,
6310             expl=self._expl if expl is None else expl,
6311             default=self.default if default is None else default,
6312             optional=self.optional if optional is None else optional,
6313         )
6314
6315     def __contains__(self, key):
6316         return key in self._value
6317
6318     def __setitem__(self, key, value):
6319         spec = self.specs.get(key)
6320         if spec is None:
6321             raise ObjUnknown(key)
6322         if value is None:
6323             self._value.pop(key, None)
6324             return
6325         if not isinstance(value, spec.__class__):
6326             raise InvalidValueType((spec.__class__,))
6327         value = spec(value=value)
6328         if spec.default is not None and value == spec.default:
6329             self._value.pop(key, None)
6330             return
6331         self._value[key] = value
6332
6333     def __getitem__(self, key):
6334         value = self._value.get(key)
6335         if value is not None:
6336             return value
6337         spec = self.specs.get(key)
6338         if spec is None:
6339             raise ObjUnknown(key)
6340         if spec.default is not None:
6341             return spec.default
6342         return None
6343
6344     def _values_for_encoding(self):
6345         for name, spec in iteritems(self.specs):
6346             value = self._value.get(name)
6347             if value is None:
6348                 if spec.optional:
6349                     continue
6350                 raise ObjNotReady(name)
6351             yield value
6352
6353     def _encode(self):
6354         v = b"".join(v.encode() for v in self._values_for_encoding())
6355         return b"".join((self.tag, len_encode(len(v)), v))
6356
6357     def _encode2nd(self, writer, state_iter):
6358         write_full(writer, self.tag + len_encode(next(state_iter)))
6359         for v in self._values_for_encoding():
6360             v.encode2nd(writer, state_iter)
6361
6362     def _encode_cer(self, writer):
6363         write_full(writer, self.tag + LENINDEF)
6364         for v in self._values_for_encoding():
6365             v.encode_cer(writer)
6366         write_full(writer, EOC)
6367
6368     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6369         try:
6370             t, tlen, lv = tag_strip(tlv)
6371         except DecodeError as err:
6372             raise err.__class__(
6373                 msg=err.msg,
6374                 klass=self.__class__,
6375                 decode_path=decode_path,
6376                 offset=offset,
6377             )
6378         if t != self.tag:
6379             raise TagMismatch(
6380                 klass=self.__class__,
6381                 decode_path=decode_path,
6382                 offset=offset,
6383             )
6384         if tag_only:  # pragma: no cover
6385             yield None
6386             return
6387         lenindef = False
6388         ctx_bered = ctx.get("bered", False)
6389         try:
6390             l, llen, v = len_decode(lv)
6391         except LenIndefForm as err:
6392             if not ctx_bered:
6393                 raise err.__class__(
6394                     msg=err.msg,
6395                     klass=self.__class__,
6396                     decode_path=decode_path,
6397                     offset=offset,
6398                 )
6399             l, llen, v = 0, 1, lv[1:]
6400             lenindef = True
6401         except DecodeError as err:
6402             raise err.__class__(
6403                 msg=err.msg,
6404                 klass=self.__class__,
6405                 decode_path=decode_path,
6406                 offset=offset,
6407             )
6408         if l > len(v):
6409             raise NotEnoughData(
6410                 "encoded length is longer than data",
6411                 klass=self.__class__,
6412                 decode_path=decode_path,
6413                 offset=offset,
6414             )
6415         if not lenindef:
6416             v, tail = v[:l], v[l:]
6417         vlen = 0
6418         sub_offset = offset + tlen + llen
6419         values = {}
6420         ber_encoded = False
6421         ctx_allow_default_values = ctx.get("allow_default_values", False)
6422         for name, spec in iteritems(self.specs):
6423             if spec.optional and (
6424                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6425                     len(v) == 0
6426             ):
6427                 continue
6428             sub_decode_path = decode_path + (name,)
6429             try:
6430                 if evgen_mode:
6431                     for _decode_path, value, v_tail in spec.decode_evgen(
6432                             v,
6433                             sub_offset,
6434                             leavemm=True,
6435                             decode_path=sub_decode_path,
6436                             ctx=ctx,
6437                             _ctx_immutable=False,
6438                     ):
6439                         yield _decode_path, value, v_tail
6440                 else:
6441                     _, value, v_tail = next(spec.decode_evgen(
6442                         v,
6443                         sub_offset,
6444                         leavemm=True,
6445                         decode_path=sub_decode_path,
6446                         ctx=ctx,
6447                         _ctx_immutable=False,
6448                         _evgen_mode=False,
6449                     ))
6450             except TagMismatch as err:
6451                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6452                     continue
6453                 raise
6454
6455             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6456             if not evgen_mode and defined is not None:
6457                 defined_by, defined_spec = defined
6458                 if issubclass(value.__class__, SequenceOf):
6459                     for i, _value in enumerate(value):
6460                         sub_sub_decode_path = sub_decode_path + (
6461                             str(i),
6462                             DecodePathDefBy(defined_by),
6463                         )
6464                         defined_value, defined_tail = defined_spec.decode(
6465                             memoryview(bytes(_value)),
6466                             sub_offset + (
6467                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6468                                 if value.expled else (value.tlen + value.llen)
6469                             ),
6470                             leavemm=True,
6471                             decode_path=sub_sub_decode_path,
6472                             ctx=ctx,
6473                             _ctx_immutable=False,
6474                         )
6475                         if len(defined_tail) > 0:
6476                             raise DecodeError(
6477                                 "remaining data",
6478                                 klass=self.__class__,
6479                                 decode_path=sub_sub_decode_path,
6480                                 offset=offset,
6481                             )
6482                         _value.defined = (defined_by, defined_value)
6483                 else:
6484                     defined_value, defined_tail = defined_spec.decode(
6485                         memoryview(bytes(value)),
6486                         sub_offset + (
6487                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6488                             if value.expled else (value.tlen + value.llen)
6489                         ),
6490                         leavemm=True,
6491                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6492                         ctx=ctx,
6493                         _ctx_immutable=False,
6494                     )
6495                     if len(defined_tail) > 0:
6496                         raise DecodeError(
6497                             "remaining data",
6498                             klass=self.__class__,
6499                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6500                             offset=offset,
6501                         )
6502                     value.defined = (defined_by, defined_value)
6503
6504             value_len = value.fulllen
6505             vlen += value_len
6506             sub_offset += value_len
6507             v = v_tail
6508             if not evgen_mode:
6509                 if spec.default is not None and value == spec.default:
6510                     # This will not work in evgen_mode
6511                     if ctx_bered or ctx_allow_default_values:
6512                         ber_encoded = True
6513                     else:
6514                         raise DecodeError(
6515                             "DEFAULT value met",
6516                             klass=self.__class__,
6517                             decode_path=sub_decode_path,
6518                             offset=sub_offset,
6519                         )
6520                 values[name] = value
6521                 spec_defines = getattr(spec, "defines", ())
6522                 if len(spec_defines) == 0:
6523                     defines_by_path = ctx.get("defines_by_path", ())
6524                     if len(defines_by_path) > 0:
6525                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6526                 if spec_defines is not None and len(spec_defines) > 0:
6527                     for rel_path, schema in spec_defines:
6528                         defined = schema.get(value, None)
6529                         if defined is not None:
6530                             ctx.setdefault("_defines", []).append((
6531                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6532                                 (value, defined),
6533                             ))
6534         if lenindef:
6535             if v[:EOC_LEN].tobytes() != EOC:
6536                 raise DecodeError(
6537                     "no EOC",
6538                     klass=self.__class__,
6539                     decode_path=decode_path,
6540                     offset=offset,
6541                 )
6542             tail = v[EOC_LEN:]
6543             vlen += EOC_LEN
6544         elif len(v) > 0:
6545             raise DecodeError(
6546                 "remaining data",
6547                 klass=self.__class__,
6548                 decode_path=decode_path,
6549                 offset=offset,
6550             )
6551         obj = self.__class__(
6552             schema=self.specs,
6553             impl=self.tag,
6554             expl=self._expl,
6555             default=self.default,
6556             optional=self.optional,
6557             _decoded=(offset, llen, vlen),
6558         )
6559         obj._value = values
6560         obj.lenindef = lenindef
6561         obj.ber_encoded = ber_encoded
6562         yield decode_path, obj, tail
6563
6564     def __repr__(self):
6565         value = pp_console_row(next(self.pps()))
6566         cols = []
6567         for name in self.specs:
6568             _value = self._value.get(name)
6569             if _value is None:
6570                 continue
6571             cols.append("%s: %s" % (name, repr(_value)))
6572         return "%s[%s]" % (value, "; ".join(cols))
6573
6574     def pps(self, decode_path=()):
6575         yield _pp(
6576             obj=self,
6577             asn1_type_name=self.asn1_type_name,
6578             obj_name=self.__class__.__name__,
6579             decode_path=decode_path,
6580             optional=self.optional,
6581             default=self == self.default,
6582             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6583             expl=None if self._expl is None else tag_decode(self._expl),
6584             offset=self.offset,
6585             tlen=self.tlen,
6586             llen=self.llen,
6587             vlen=self.vlen,
6588             expl_offset=self.expl_offset if self.expled else None,
6589             expl_tlen=self.expl_tlen if self.expled else None,
6590             expl_llen=self.expl_llen if self.expled else None,
6591             expl_vlen=self.expl_vlen if self.expled else None,
6592             expl_lenindef=self.expl_lenindef,
6593             lenindef=self.lenindef,
6594             ber_encoded=self.ber_encoded,
6595             bered=self.bered,
6596         )
6597         for name in self.specs:
6598             value = self._value.get(name)
6599             if value is None:
6600                 continue
6601             yield value.pps(decode_path=decode_path + (name,))
6602         for pp in self.pps_lenindef(decode_path):
6603             yield pp
6604
6605
6606 class Set(Sequence, SequenceEncode1stMixing):
6607     """``SET`` structure type
6608
6609     Its usage is identical to :py:class:`pyderasn.Sequence`.
6610
6611     .. _allow_unordered_set_ctx:
6612
6613     DER prohibits unordered values encoding and will raise an error
6614     during decode. If :ref:`bered <bered_ctx>` context option is set,
6615     then no error will occur. Also you can disable strict values
6616     ordering check by setting ``"allow_unordered_set": True``
6617     :ref:`context <ctx>` option.
6618     """
6619     __slots__ = ()
6620     tag_default = tag_encode(form=TagFormConstructed, num=17)
6621     asn1_type_name = "SET"
6622
6623     def _values_for_encoding(self):
6624         return sorted(
6625             super(Set, self)._values_for_encoding(),
6626             key=attrgetter("tag_order"),
6627         )
6628
6629     def _encode_cer(self, writer):
6630         write_full(writer, self.tag + LENINDEF)
6631         for v in sorted(
6632                 super(Set, self)._values_for_encoding(),
6633                 key=attrgetter("tag_order_cer"),
6634         ):
6635             v.encode_cer(writer)
6636         write_full(writer, EOC)
6637
6638     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6639         try:
6640             t, tlen, lv = tag_strip(tlv)
6641         except DecodeError as err:
6642             raise err.__class__(
6643                 msg=err.msg,
6644                 klass=self.__class__,
6645                 decode_path=decode_path,
6646                 offset=offset,
6647             )
6648         if t != self.tag:
6649             raise TagMismatch(
6650                 klass=self.__class__,
6651                 decode_path=decode_path,
6652                 offset=offset,
6653             )
6654         if tag_only:
6655             yield None
6656             return
6657         lenindef = False
6658         ctx_bered = ctx.get("bered", False)
6659         try:
6660             l, llen, v = len_decode(lv)
6661         except LenIndefForm as err:
6662             if not ctx_bered:
6663                 raise err.__class__(
6664                     msg=err.msg,
6665                     klass=self.__class__,
6666                     decode_path=decode_path,
6667                     offset=offset,
6668                 )
6669             l, llen, v = 0, 1, lv[1:]
6670             lenindef = True
6671         except DecodeError as err:
6672             raise err.__class__(
6673                 msg=err.msg,
6674                 klass=self.__class__,
6675                 decode_path=decode_path,
6676                 offset=offset,
6677             )
6678         if l > len(v):
6679             raise NotEnoughData(
6680                 "encoded length is longer than data",
6681                 klass=self.__class__,
6682                 offset=offset,
6683             )
6684         if not lenindef:
6685             v, tail = v[:l], v[l:]
6686         vlen = 0
6687         sub_offset = offset + tlen + llen
6688         values = {}
6689         ber_encoded = False
6690         ctx_allow_default_values = ctx.get("allow_default_values", False)
6691         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6692         tag_order_prev = (0, 0)
6693         _specs_items = copy(self.specs)
6694
6695         while len(v) > 0:
6696             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6697                 break
6698             for name, spec in iteritems(_specs_items):
6699                 sub_decode_path = decode_path + (name,)
6700                 try:
6701                     spec.decode(
6702                         v,
6703                         sub_offset,
6704                         leavemm=True,
6705                         decode_path=sub_decode_path,
6706                         ctx=ctx,
6707                         tag_only=True,
6708                         _ctx_immutable=False,
6709                     )
6710                 except TagMismatch:
6711                     continue
6712                 break
6713             else:
6714                 raise TagMismatch(
6715                     klass=self.__class__,
6716                     decode_path=decode_path,
6717                     offset=offset,
6718                 )
6719             if evgen_mode:
6720                 for _decode_path, value, v_tail in spec.decode_evgen(
6721                         v,
6722                         sub_offset,
6723                         leavemm=True,
6724                         decode_path=sub_decode_path,
6725                         ctx=ctx,
6726                         _ctx_immutable=False,
6727                 ):
6728                     yield _decode_path, value, v_tail
6729             else:
6730                 _, value, v_tail = next(spec.decode_evgen(
6731                     v,
6732                     sub_offset,
6733                     leavemm=True,
6734                     decode_path=sub_decode_path,
6735                     ctx=ctx,
6736                     _ctx_immutable=False,
6737                     _evgen_mode=False,
6738                 ))
6739             value_tag_order = value.tag_order
6740             value_len = value.fulllen
6741             if tag_order_prev >= value_tag_order:
6742                 if ctx_bered or ctx_allow_unordered_set:
6743                     ber_encoded = True
6744                 else:
6745                     raise DecodeError(
6746                         "unordered " + self.asn1_type_name,
6747                         klass=self.__class__,
6748                         decode_path=sub_decode_path,
6749                         offset=sub_offset,
6750                     )
6751             if spec.default is None or value != spec.default:
6752                 pass
6753             elif ctx_bered or ctx_allow_default_values:
6754                 ber_encoded = True
6755             else:
6756                 raise DecodeError(
6757                     "DEFAULT value met",
6758                     klass=self.__class__,
6759                     decode_path=sub_decode_path,
6760                     offset=sub_offset,
6761                 )
6762             values[name] = value
6763             del _specs_items[name]
6764             tag_order_prev = value_tag_order
6765             sub_offset += value_len
6766             vlen += value_len
6767             v = v_tail
6768
6769         obj = self.__class__(
6770             schema=self.specs,
6771             impl=self.tag,
6772             expl=self._expl,
6773             default=self.default,
6774             optional=self.optional,
6775             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6776         )
6777         if lenindef:
6778             if v[:EOC_LEN].tobytes() != EOC:
6779                 raise DecodeError(
6780                     "no EOC",
6781                     klass=self.__class__,
6782                     decode_path=decode_path,
6783                     offset=offset,
6784                 )
6785             tail = v[EOC_LEN:]
6786             obj.lenindef = True
6787         for name, spec in iteritems(self.specs):
6788             if name not in values and not spec.optional:
6789                 raise DecodeError(
6790                     "%s value is not ready" % name,
6791                     klass=self.__class__,
6792                     decode_path=decode_path,
6793                     offset=offset,
6794                 )
6795         if not evgen_mode:
6796             obj._value = values
6797         obj.ber_encoded = ber_encoded
6798         yield decode_path, obj, tail
6799
6800
6801 SequenceOfState = namedtuple(
6802     "SequenceOfState",
6803     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6804     **NAMEDTUPLE_KWARGS
6805 )
6806
6807
6808 class SequenceOf(SequenceEncode1stMixing, Obj):
6809     """``SEQUENCE OF`` sequence type
6810
6811     For that kind of type you must specify the object it will carry on
6812     (bounds are for example here, not required)::
6813
6814         class Ints(SequenceOf):
6815             schema = Integer()
6816             bounds = (0, 2)
6817
6818     >>> ints = Ints()
6819     >>> ints.append(Integer(123))
6820     >>> ints.append(Integer(234))
6821     >>> ints
6822     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6823     >>> [int(i) for i in ints]
6824     [123, 234]
6825     >>> ints.append(Integer(345))
6826     Traceback (most recent call last):
6827     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6828     >>> ints[1]
6829     INTEGER 234
6830     >>> ints[1] = Integer(345)
6831     >>> ints
6832     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6833
6834     You can initialize sequence with preinitialized values:
6835
6836     >>> ints = Ints([Integer(123), Integer(234)])
6837
6838     Also you can use iterator as a value:
6839
6840     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6841
6842     And it won't be iterated until encoding process. Pay attention that
6843     bounds and required schema checks are done only during the encoding
6844     process in that case! After encode was called, then value is zeroed
6845     back to empty list and you have to set it again. That mode is useful
6846     mainly with CER encoding mode, where all objects from the iterable
6847     will be streamed to the buffer, without copying all of them to
6848     memory first.
6849     """
6850     __slots__ = ("spec", "_bound_min", "_bound_max")
6851     tag_default = tag_encode(form=TagFormConstructed, num=16)
6852     asn1_type_name = "SEQUENCE OF"
6853
6854     def __init__(
6855             self,
6856             value=None,
6857             schema=None,
6858             bounds=None,
6859             impl=None,
6860             expl=None,
6861             default=None,
6862             optional=False,
6863             _decoded=(0, 0, 0),
6864     ):
6865         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6866         if schema is None:
6867             schema = getattr(self, "schema", None)
6868         if schema is None:
6869             raise ValueError("schema must be specified")
6870         self.spec = schema
6871         self._bound_min, self._bound_max = getattr(
6872             self,
6873             "bounds",
6874             (0, float("+inf")),
6875         ) if bounds is None else bounds
6876         self._value = []
6877         if value is not None:
6878             self._value = self._value_sanitize(value)
6879         if default is not None:
6880             default_value = self._value_sanitize(default)
6881             default_obj = self.__class__(
6882                 schema=schema,
6883                 impl=self.tag,
6884                 expl=self._expl,
6885             )
6886             default_obj._value = default_value
6887             self.default = default_obj
6888             if value is None:
6889                 self._value = copy(default_obj._value)
6890
6891     def _value_sanitize(self, value):
6892         iterator = False
6893         if issubclass(value.__class__, SequenceOf):
6894             value = value._value
6895         elif hasattr(value, NEXT_ATTR_NAME):
6896             iterator = True
6897         elif hasattr(value, "__iter__"):
6898             value = list(value)
6899         else:
6900             raise InvalidValueType((self.__class__, iter, "iterator"))
6901         if not iterator:
6902             if not self._bound_min <= len(value) <= self._bound_max:
6903                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6904             class_expected = self.spec.__class__
6905             for v in value:
6906                 if not isinstance(v, class_expected):
6907                     raise InvalidValueType((class_expected,))
6908         return value
6909
6910     @property
6911     def ready(self):
6912         if hasattr(self._value, NEXT_ATTR_NAME):
6913             return True
6914         if self._bound_min > 0 and len(self._value) == 0:
6915             return False
6916         return all(v.ready for v in self._value)
6917
6918     @property
6919     def bered(self):
6920         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6921             return True
6922         return any(v.bered for v in self._value)
6923
6924     def __getstate__(self):
6925         if hasattr(self._value, NEXT_ATTR_NAME):
6926             raise ValueError("can not pickle SequenceOf with iterator")
6927         return SequenceOfState(
6928             __version__,
6929             self.tag,
6930             self._tag_order,
6931             self._expl,
6932             self.default,
6933             self.optional,
6934             self.offset,
6935             self.llen,
6936             self.vlen,
6937             self.expl_lenindef,
6938             self.lenindef,
6939             self.ber_encoded,
6940             self.spec,
6941             [copy(v) for v in self._value],
6942             self._bound_min,
6943             self._bound_max,
6944         )
6945
6946     def __setstate__(self, state):
6947         super(SequenceOf, self).__setstate__(state)
6948         self.spec = state.spec
6949         self._value = state.value
6950         self._bound_min = state.bound_min
6951         self._bound_max = state.bound_max
6952
6953     def __eq__(self, their):
6954         if isinstance(their, self.__class__):
6955             return (
6956                 self.spec == their.spec and
6957                 self.tag == their.tag and
6958                 self._expl == their._expl and
6959                 self._value == their._value
6960             )
6961         if hasattr(their, "__iter__"):
6962             return self._value == list(their)
6963         return False
6964
6965     def __call__(
6966             self,
6967             value=None,
6968             bounds=None,
6969             impl=None,
6970             expl=None,
6971             default=None,
6972             optional=None,
6973     ):
6974         return self.__class__(
6975             value=value,
6976             schema=self.spec,
6977             bounds=(
6978                 (self._bound_min, self._bound_max)
6979                 if bounds is None else bounds
6980             ),
6981             impl=self.tag if impl is None else impl,
6982             expl=self._expl if expl is None else expl,
6983             default=self.default if default is None else default,
6984             optional=self.optional if optional is None else optional,
6985         )
6986
6987     def __contains__(self, key):
6988         return key in self._value
6989
6990     def append(self, value):
6991         if not isinstance(value, self.spec.__class__):
6992             raise InvalidValueType((self.spec.__class__,))
6993         if len(self._value) + 1 > self._bound_max:
6994             raise BoundsError(
6995                 self._bound_min,
6996                 len(self._value) + 1,
6997                 self._bound_max,
6998             )
6999         self._value.append(value)
7000
7001     def __iter__(self):
7002         return iter(self._value)
7003
7004     def __len__(self):
7005         return len(self._value)
7006
7007     def __setitem__(self, key, value):
7008         if not isinstance(value, self.spec.__class__):
7009             raise InvalidValueType((self.spec.__class__,))
7010         self._value[key] = self.spec(value=value)
7011
7012     def __getitem__(self, key):
7013         return self._value[key]
7014
7015     def _values_for_encoding(self):
7016         return iter(self._value)
7017
7018     def _encode(self):
7019         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7020         if iterator:
7021             values = []
7022             values_append = values.append
7023             class_expected = self.spec.__class__
7024             values_for_encoding = self._values_for_encoding()
7025             self._value = []
7026             for v in values_for_encoding:
7027                 if not isinstance(v, class_expected):
7028                     raise InvalidValueType((class_expected,))
7029                 values_append(v.encode())
7030             if not self._bound_min <= len(values) <= self._bound_max:
7031                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7032             value = b"".join(values)
7033         else:
7034             value = b"".join(v.encode() for v in self._values_for_encoding())
7035         return b"".join((self.tag, len_encode(len(value)), value))
7036
7037     def _encode1st(self, state):
7038         state = super(SequenceOf, self)._encode1st(state)
7039         if hasattr(self._value, NEXT_ATTR_NAME):
7040             self._value = []
7041         return state
7042
7043     def _encode2nd(self, writer, state_iter):
7044         write_full(writer, self.tag + len_encode(next(state_iter)))
7045         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7046         if iterator:
7047             values_count = 0
7048             class_expected = self.spec.__class__
7049             values_for_encoding = self._values_for_encoding()
7050             self._value = []
7051             for v in values_for_encoding:
7052                 if not isinstance(v, class_expected):
7053                     raise InvalidValueType((class_expected,))
7054                 v.encode2nd(writer, state_iter)
7055                 values_count += 1
7056             if not self._bound_min <= values_count <= self._bound_max:
7057                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7058         else:
7059             for v in self._values_for_encoding():
7060                 v.encode2nd(writer, state_iter)
7061
7062     def _encode_cer(self, writer):
7063         write_full(writer, self.tag + LENINDEF)
7064         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7065         if iterator:
7066             class_expected = self.spec.__class__
7067             values_count = 0
7068             values_for_encoding = self._values_for_encoding()
7069             self._value = []
7070             for v in values_for_encoding:
7071                 if not isinstance(v, class_expected):
7072                     raise InvalidValueType((class_expected,))
7073                 v.encode_cer(writer)
7074                 values_count += 1
7075             if not self._bound_min <= values_count <= self._bound_max:
7076                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7077         else:
7078             for v in self._values_for_encoding():
7079                 v.encode_cer(writer)
7080         write_full(writer, EOC)
7081
7082     def _decode(
7083             self,
7084             tlv,
7085             offset,
7086             decode_path,
7087             ctx,
7088             tag_only,
7089             evgen_mode,
7090             ordering_check=False,
7091     ):
7092         try:
7093             t, tlen, lv = tag_strip(tlv)
7094         except DecodeError as err:
7095             raise err.__class__(
7096                 msg=err.msg,
7097                 klass=self.__class__,
7098                 decode_path=decode_path,
7099                 offset=offset,
7100             )
7101         if t != self.tag:
7102             raise TagMismatch(
7103                 klass=self.__class__,
7104                 decode_path=decode_path,
7105                 offset=offset,
7106             )
7107         if tag_only:
7108             yield None
7109             return
7110         lenindef = False
7111         ctx_bered = ctx.get("bered", False)
7112         try:
7113             l, llen, v = len_decode(lv)
7114         except LenIndefForm as err:
7115             if not ctx_bered:
7116                 raise err.__class__(
7117                     msg=err.msg,
7118                     klass=self.__class__,
7119                     decode_path=decode_path,
7120                     offset=offset,
7121                 )
7122             l, llen, v = 0, 1, lv[1:]
7123             lenindef = True
7124         except DecodeError as err:
7125             raise err.__class__(
7126                 msg=err.msg,
7127                 klass=self.__class__,
7128                 decode_path=decode_path,
7129                 offset=offset,
7130             )
7131         if l > len(v):
7132             raise NotEnoughData(
7133                 "encoded length is longer than data",
7134                 klass=self.__class__,
7135                 decode_path=decode_path,
7136                 offset=offset,
7137             )
7138         if not lenindef:
7139             v, tail = v[:l], v[l:]
7140         vlen = 0
7141         sub_offset = offset + tlen + llen
7142         _value = []
7143         _value_count = 0
7144         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7145         value_prev = memoryview(v[:0])
7146         ber_encoded = False
7147         spec = self.spec
7148         while len(v) > 0:
7149             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7150                 break
7151             sub_decode_path = decode_path + (str(_value_count),)
7152             if evgen_mode:
7153                 for _decode_path, value, v_tail in spec.decode_evgen(
7154                         v,
7155                         sub_offset,
7156                         leavemm=True,
7157                         decode_path=sub_decode_path,
7158                         ctx=ctx,
7159                         _ctx_immutable=False,
7160                 ):
7161                     yield _decode_path, value, v_tail
7162             else:
7163                 _, value, v_tail = next(spec.decode_evgen(
7164                     v,
7165                     sub_offset,
7166                     leavemm=True,
7167                     decode_path=sub_decode_path,
7168                     ctx=ctx,
7169                     _ctx_immutable=False,
7170                     _evgen_mode=False,
7171                 ))
7172             value_len = value.fulllen
7173             if ordering_check:
7174                 if value_prev.tobytes() > v[:value_len].tobytes():
7175                     if ctx_bered or ctx_allow_unordered_set:
7176                         ber_encoded = True
7177                     else:
7178                         raise DecodeError(
7179                             "unordered " + self.asn1_type_name,
7180                             klass=self.__class__,
7181                             decode_path=sub_decode_path,
7182                             offset=sub_offset,
7183                         )
7184                 value_prev = v[:value_len]
7185             _value_count += 1
7186             if not evgen_mode:
7187                 _value.append(value)
7188             sub_offset += value_len
7189             vlen += value_len
7190             v = v_tail
7191         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7192             raise DecodeError(
7193                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7194                 klass=self.__class__,
7195                 decode_path=decode_path,
7196                 offset=offset,
7197             )
7198         try:
7199             obj = self.__class__(
7200                 value=None if evgen_mode else _value,
7201                 schema=spec,
7202                 bounds=(self._bound_min, self._bound_max),
7203                 impl=self.tag,
7204                 expl=self._expl,
7205                 default=self.default,
7206                 optional=self.optional,
7207                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7208             )
7209         except BoundsError as err:
7210             raise DecodeError(
7211                 msg=str(err),
7212                 klass=self.__class__,
7213                 decode_path=decode_path,
7214                 offset=offset,
7215             )
7216         if lenindef:
7217             if v[:EOC_LEN].tobytes() != EOC:
7218                 raise DecodeError(
7219                     "no EOC",
7220                     klass=self.__class__,
7221                     decode_path=decode_path,
7222                     offset=offset,
7223                 )
7224             obj.lenindef = True
7225             tail = v[EOC_LEN:]
7226         obj.ber_encoded = ber_encoded
7227         yield decode_path, obj, tail
7228
7229     def __repr__(self):
7230         return "%s[%s]" % (
7231             pp_console_row(next(self.pps())),
7232             ", ".join(repr(v) for v in self._value),
7233         )
7234
7235     def pps(self, decode_path=()):
7236         yield _pp(
7237             obj=self,
7238             asn1_type_name=self.asn1_type_name,
7239             obj_name=self.__class__.__name__,
7240             decode_path=decode_path,
7241             optional=self.optional,
7242             default=self == self.default,
7243             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7244             expl=None if self._expl is None else tag_decode(self._expl),
7245             offset=self.offset,
7246             tlen=self.tlen,
7247             llen=self.llen,
7248             vlen=self.vlen,
7249             expl_offset=self.expl_offset if self.expled else None,
7250             expl_tlen=self.expl_tlen if self.expled else None,
7251             expl_llen=self.expl_llen if self.expled else None,
7252             expl_vlen=self.expl_vlen if self.expled else None,
7253             expl_lenindef=self.expl_lenindef,
7254             lenindef=self.lenindef,
7255             ber_encoded=self.ber_encoded,
7256             bered=self.bered,
7257         )
7258         for i, value in enumerate(self._value):
7259             yield value.pps(decode_path=decode_path + (str(i),))
7260         for pp in self.pps_lenindef(decode_path):
7261             yield pp
7262
7263
7264 class SetOf(SequenceOf):
7265     """``SET OF`` sequence type
7266
7267     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7268     """
7269     __slots__ = ()
7270     tag_default = tag_encode(form=TagFormConstructed, num=17)
7271     asn1_type_name = "SET OF"
7272
7273     def _value_sanitize(self, value):
7274         value = super(SetOf, self)._value_sanitize(value)
7275         if hasattr(value, NEXT_ATTR_NAME):
7276             raise ValueError(
7277                 "SetOf does not support iterator values, as no sense in them"
7278             )
7279         return value
7280
7281     def _encode(self):
7282         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7283         return b"".join((self.tag, len_encode(len(v)), v))
7284
7285     def _encode2nd(self, writer, state_iter):
7286         write_full(writer, self.tag + len_encode(next(state_iter)))
7287         values = []
7288         for v in self._values_for_encoding():
7289             buf = BytesIO()
7290             v.encode2nd(buf.write, state_iter)
7291             values.append(buf.getvalue())
7292         values.sort()
7293         for v in values:
7294             write_full(writer, v)
7295
7296     def _encode_cer(self, writer):
7297         write_full(writer, self.tag + LENINDEF)
7298         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7299             write_full(writer, v)
7300         write_full(writer, EOC)
7301
7302     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7303         return super(SetOf, self)._decode(
7304             tlv,
7305             offset,
7306             decode_path,
7307             ctx,
7308             tag_only,
7309             evgen_mode,
7310             ordering_check=True,
7311         )
7312
7313
7314 def obj_by_path(pypath):  # pragma: no cover
7315     """Import object specified as string Python path
7316
7317     Modules must be separated from classes/functions with ``:``.
7318
7319     >>> obj_by_path("foo.bar:Baz")
7320     <class 'foo.bar.Baz'>
7321     >>> obj_by_path("foo.bar:Baz.boo")
7322     <classmethod 'foo.bar.Baz.boo'>
7323     """
7324     mod, objs = pypath.rsplit(":", 1)
7325     from importlib import import_module
7326     obj = import_module(mod)
7327     for obj_name in objs.split("."):
7328         obj = getattr(obj, obj_name)
7329     return obj
7330
7331
7332 def generic_decoder():  # pragma: no cover
7333     # All of this below is a big hack with self references
7334     choice = PrimitiveTypes()
7335     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7336     choice.specs["SetOf"] = SetOf(schema=choice)
7337     for i in six_xrange(31):
7338         choice.specs["SequenceOf%d" % i] = SequenceOf(
7339             schema=choice,
7340             expl=tag_ctxc(i),
7341         )
7342     choice.specs["Any"] = Any()
7343
7344     # Class name equals to type name, to omit it from output
7345     class SEQUENCEOF(SequenceOf):
7346         __slots__ = ()
7347         schema = choice
7348
7349     def pprint_any(
7350             obj,
7351             oid_maps=(),
7352             with_colours=False,
7353             with_decode_path=False,
7354             decode_path_only=(),
7355             decode_path=(),
7356     ):
7357         def _pprint_pps(pps):
7358             for pp in pps:
7359                 if hasattr(pp, "_fields"):
7360                     if (
7361                             decode_path_only != () and
7362                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7363                     ):
7364                         continue
7365                     if pp.asn1_type_name == Choice.asn1_type_name:
7366                         continue
7367                     pp_kwargs = pp._asdict()
7368                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7369                     pp = _pp(**pp_kwargs)
7370                     yield pp_console_row(
7371                         pp,
7372                         oid_maps=oid_maps,
7373                         with_offsets=True,
7374                         with_blob=False,
7375                         with_colours=with_colours,
7376                         with_decode_path=with_decode_path,
7377                         decode_path_len_decrease=len(decode_path_only),
7378                     )
7379                     for row in pp_console_blob(
7380                             pp,
7381                             decode_path_len_decrease=len(decode_path_only),
7382                     ):
7383                         yield row
7384                 else:
7385                     for row in _pprint_pps(pp):
7386                         yield row
7387         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7388     return SEQUENCEOF(), pprint_any
7389
7390
7391 def main():  # pragma: no cover
7392     import argparse
7393     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7394     parser.add_argument(
7395         "--skip",
7396         type=int,
7397         default=0,
7398         help="Skip that number of bytes from the beginning",
7399     )
7400     parser.add_argument(
7401         "--oids",
7402         help="Python paths to dictionary with OIDs, comma separated",
7403     )
7404     parser.add_argument(
7405         "--schema",
7406         help="Python path to schema definition to use",
7407     )
7408     parser.add_argument(
7409         "--defines-by-path",
7410         help="Python path to decoder's defines_by_path",
7411     )
7412     parser.add_argument(
7413         "--nobered",
7414         action="store_true",
7415         help="Disallow BER encoding",
7416     )
7417     parser.add_argument(
7418         "--print-decode-path",
7419         action="store_true",
7420         help="Print decode paths",
7421     )
7422     parser.add_argument(
7423         "--decode-path-only",
7424         help="Print only specified decode path",
7425     )
7426     parser.add_argument(
7427         "--allow-expl-oob",
7428         action="store_true",
7429         help="Allow explicit tag out-of-bound",
7430     )
7431     parser.add_argument(
7432         "--evgen",
7433         action="store_true",
7434         help="Turn on event generation mode",
7435     )
7436     parser.add_argument(
7437         "RAWFile",
7438         type=argparse.FileType("rb"),
7439         help="Path to BER/CER/DER file you want to decode",
7440     )
7441     args = parser.parse_args()
7442     if PY2:
7443         args.RAWFile.seek(args.skip)
7444         raw = memoryview(args.RAWFile.read())
7445         args.RAWFile.close()
7446     else:
7447         raw = file_mmaped(args.RAWFile)[args.skip:]
7448     oid_maps = (
7449         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7450         if args.oids else ()
7451     )
7452     from functools import partial
7453     if args.schema:
7454         schema = obj_by_path(args.schema)
7455         pprinter = partial(pprint, big_blobs=True)
7456     else:
7457         schema, pprinter = generic_decoder()
7458     ctx = {
7459         "bered": not args.nobered,
7460         "allow_expl_oob": args.allow_expl_oob,
7461     }
7462     if args.defines_by_path is not None:
7463         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7464     from os import environ
7465     pprinter = partial(
7466         pprinter,
7467         oid_maps=oid_maps,
7468         with_colours=environ.get("NO_COLOR") is None,
7469         with_decode_path=args.print_decode_path,
7470         decode_path_only=(
7471             () if args.decode_path_only is None else
7472             tuple(args.decode_path_only.split(":"))
7473         ),
7474     )
7475     if args.evgen:
7476         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7477             print(pprinter(obj, decode_path=decode_path))
7478     else:
7479         obj, tail = schema().decode(raw, ctx=ctx)
7480         print(pprinter(obj))
7481     if tail != b"":
7482         print("\nTrailing data: %s" % hexenc(tail))
7483
7484
7485 if __name__ == "__main__":
7486     main()