]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Integer.tohex()
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 Base Obj
905 --------
906 .. autoclass:: pyderasn.Obj
907    :members:
908
909 Primitive types
910 ---------------
911
912 Boolean
913 _______
914 .. autoclass:: pyderasn.Boolean
915    :members: __init__
916
917 Integer
918 _______
919 .. autoclass:: pyderasn.Integer
920    :members: __init__, named, tohex
921
922 BitString
923 _________
924 .. autoclass:: pyderasn.BitString
925    :members: __init__, bit_len, named
926
927 OctetString
928 ___________
929 .. autoclass:: pyderasn.OctetString
930    :members: __init__
931
932 Null
933 ____
934 .. autoclass:: pyderasn.Null
935    :members: __init__
936
937 ObjectIdentifier
938 ________________
939 .. autoclass:: pyderasn.ObjectIdentifier
940    :members: __init__
941
942 Enumerated
943 __________
944 .. autoclass:: pyderasn.Enumerated
945
946 CommonString
947 ____________
948 .. autoclass:: pyderasn.CommonString
949
950 NumericString
951 _____________
952 .. autoclass:: pyderasn.NumericString
953
954 PrintableString
955 _______________
956 .. autoclass:: pyderasn.PrintableString
957    :members: __init__, allow_asterisk, allow_ampersand
958
959 UTCTime
960 _______
961 .. autoclass:: pyderasn.UTCTime
962    :members: __init__, todatetime
963
964 GeneralizedTime
965 _______________
966 .. autoclass:: pyderasn.GeneralizedTime
967    :members: __init__, todatetime
968
969 Special types
970 -------------
971
972 Choice
973 ______
974 .. autoclass:: pyderasn.Choice
975    :members: __init__, choice, value
976
977 PrimitiveTypes
978 ______________
979 .. autoclass:: PrimitiveTypes
980
981 Any
982 ___
983 .. autoclass:: pyderasn.Any
984    :members: __init__
985
986 Constructed types
987 -----------------
988
989 Sequence
990 ________
991 .. autoclass:: pyderasn.Sequence
992    :members: __init__
993
994 Set
995 ___
996 .. autoclass:: pyderasn.Set
997    :members: __init__
998
999 SequenceOf
1000 __________
1001 .. autoclass:: pyderasn.SequenceOf
1002    :members: __init__
1003
1004 SetOf
1005 _____
1006 .. autoclass:: pyderasn.SetOf
1007    :members: __init__
1008
1009 Various
1010 -------
1011
1012 .. autofunction:: pyderasn.abs_decode_path
1013 .. autofunction:: pyderasn.agg_octet_string
1014 .. autofunction:: pyderasn.colonize_hex
1015 .. autofunction:: pyderasn.encode2pass
1016 .. autofunction:: pyderasn.encode_cer
1017 .. autofunction:: pyderasn.file_mmaped
1018 .. autofunction:: pyderasn.hexenc
1019 .. autofunction:: pyderasn.hexdec
1020 .. autofunction:: pyderasn.tag_encode
1021 .. autofunction:: pyderasn.tag_decode
1022 .. autofunction:: pyderasn.tag_ctxp
1023 .. autofunction:: pyderasn.tag_ctxc
1024 .. autoclass:: pyderasn.DecodeError
1025    :members: __init__
1026 .. autoclass:: pyderasn.NotEnoughData
1027 .. autoclass:: pyderasn.ExceedingData
1028 .. autoclass:: pyderasn.LenIndefForm
1029 .. autoclass:: pyderasn.TagMismatch
1030 .. autoclass:: pyderasn.InvalidLength
1031 .. autoclass:: pyderasn.InvalidOID
1032 .. autoclass:: pyderasn.ObjUnknown
1033 .. autoclass:: pyderasn.ObjNotReady
1034 .. autoclass:: pyderasn.InvalidValueType
1035 .. autoclass:: pyderasn.BoundsError
1036
1037 .. _cmdline:
1038
1039 Command-line usage
1040 ------------------
1041
1042 You can decode DER/BER files using command line abilities::
1043
1044     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1045
1046 If there is no schema for your file, then you can try parsing it without,
1047 but of course IMPLICIT tags will often make it impossible. But result is
1048 good enough for the certificate above::
1049
1050     $ python -m pyderasn path/to/file
1051         0   [1,3,1604]  . >: SEQUENCE OF
1052         4   [1,3,1453]  . . >: SEQUENCE OF
1053         8   [0,0,   5]  . . . . >: [0] ANY
1054                         . . . . . A0:03:02:01:02
1055        13   [1,1,   3]  . . . . >: INTEGER 61595
1056        18   [1,1,  13]  . . . . >: SEQUENCE OF
1057        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1058        31   [1,1,   0]  . . . . . . >: NULL
1059        33   [1,3, 274]  . . . . >: SEQUENCE OF
1060        37   [1,1,  11]  . . . . . . >: SET OF
1061        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1062        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1063        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1064     [...]
1065      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1066      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1067      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1068                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1069                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1070                         . . . . . . . . . 61:2E:63:6F:6D:2F
1071      1461   [1,1,  13]  . . >: SEQUENCE OF
1072      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1073      1474   [1,1,   0]  . . . . >: NULL
1074      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1075                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1076                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1077     [...]
1078
1079 Human readable OIDs
1080 ___________________
1081
1082 If you have got dictionaries with ObjectIdentifiers, like example one
1083 from ``tests/test_crts.py``::
1084
1085     stroid2name = {
1086         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1087         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1088         [...]
1089         "2.5.4.10": "id-at-organizationName",
1090         "2.5.4.11": "id-at-organizationalUnitName",
1091     }
1092
1093 then you can pass it to pretty printer to see human readable OIDs::
1094
1095     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1096     [...]
1097        37   [1,1,  11]  . . . . . . >: SET OF
1098        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1099        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1100        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1101        50   [1,1,  18]  . . . . . . >: SET OF
1102        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1103        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1104        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1105        70   [1,1,  18]  . . . . . . >: SET OF
1106        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1107        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1108        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1109     [...]
1110
1111 Decode paths
1112 ____________
1113
1114 Each decoded element has so-called decode path: sequence of structure
1115 names it is passing during the decode process. Each element has its own
1116 unique path inside the whole ASN.1 tree. You can print it out with
1117 ``--print-decode-path`` option::
1118
1119     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1120        0    [1,3,1604]  Certificate SEQUENCE []
1121        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1122       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1123       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1124       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1125       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1126       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1127                          . . . . 05:00
1128       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1129       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1130       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1131       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1132       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1133       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1134                          . . . . . . . 13:02:45:53
1135       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1136     [...]
1137
1138 Now you can print only the specified tree, for example signature algorithm::
1139
1140     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1141       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1142       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1143       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1144                          . . 05:00
1145 """
1146
1147 from array import array
1148 from codecs import getdecoder
1149 from codecs import getencoder
1150 from collections import namedtuple
1151 from collections import OrderedDict
1152 from copy import copy
1153 from datetime import datetime
1154 from datetime import timedelta
1155 from io import BytesIO
1156 from math import ceil
1157 from mmap import mmap
1158 from mmap import PROT_READ
1159 from operator import attrgetter
1160 from string import ascii_letters
1161 from string import digits
1162 from sys import maxsize as sys_maxsize
1163 from sys import version_info
1164 from unicodedata import category as unicat
1165
1166 from six import add_metaclass
1167 from six import binary_type
1168 from six import byte2int
1169 from six import indexbytes
1170 from six import int2byte
1171 from six import integer_types
1172 from six import iterbytes
1173 from six import iteritems
1174 from six import itervalues
1175 from six import PY2
1176 from six import string_types
1177 from six import text_type
1178 from six import unichr as six_unichr
1179 from six.moves import xrange as six_xrange
1180
1181
1182 try:
1183     from termcolor import colored
1184 except ImportError:  # pragma: no cover
1185     def colored(what, *args, **kwargs):
1186         return what
1187
1188 __version__ = "7.4"
1189
1190 __all__ = (
1191     "agg_octet_string",
1192     "Any",
1193     "BitString",
1194     "BMPString",
1195     "Boolean",
1196     "BoundsError",
1197     "Choice",
1198     "colonize_hex",
1199     "DecodeError",
1200     "DecodePathDefBy",
1201     "encode2pass",
1202     "encode_cer",
1203     "Enumerated",
1204     "ExceedingData",
1205     "file_mmaped",
1206     "GeneralizedTime",
1207     "GeneralString",
1208     "GraphicString",
1209     "hexdec",
1210     "hexenc",
1211     "IA5String",
1212     "Integer",
1213     "InvalidLength",
1214     "InvalidOID",
1215     "InvalidValueType",
1216     "ISO646String",
1217     "LenIndefForm",
1218     "NotEnoughData",
1219     "Null",
1220     "NumericString",
1221     "obj_by_path",
1222     "ObjectIdentifier",
1223     "ObjNotReady",
1224     "ObjUnknown",
1225     "OctetString",
1226     "PrimitiveTypes",
1227     "PrintableString",
1228     "Sequence",
1229     "SequenceOf",
1230     "Set",
1231     "SetOf",
1232     "T61String",
1233     "tag_ctxc",
1234     "tag_ctxp",
1235     "tag_decode",
1236     "TagClassApplication",
1237     "TagClassContext",
1238     "TagClassPrivate",
1239     "TagClassUniversal",
1240     "TagFormConstructed",
1241     "TagFormPrimitive",
1242     "TagMismatch",
1243     "TeletexString",
1244     "UniversalString",
1245     "UTCTime",
1246     "UTF8String",
1247     "VideotexString",
1248     "VisibleString",
1249 )
1250
1251 TagClassUniversal = 0
1252 TagClassApplication = 1 << 6
1253 TagClassContext = 1 << 7
1254 TagClassPrivate = 1 << 6 | 1 << 7
1255 TagFormPrimitive = 0
1256 TagFormConstructed = 1 << 5
1257 TagClassReprs = {
1258     TagClassContext: "",
1259     TagClassApplication: "APPLICATION ",
1260     TagClassPrivate: "PRIVATE ",
1261     TagClassUniversal: "UNIV ",
1262 }
1263 EOC = b"\x00\x00"
1264 EOC_LEN = len(EOC)
1265 LENINDEF = b"\x80"  # length indefinite mark
1266 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1267 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1268 SET01 = frozenset("01")
1269 DECIMALS = frozenset(digits)
1270 DECIMAL_SIGNS = ".,"
1271 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1272
1273
1274 def file_mmaped(fd):
1275     """Make mmap-ed memoryview for reading from file
1276
1277     :param fd: file object
1278     :returns: memoryview over read-only mmap-ing of the whole file
1279     """
1280     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1281
1282 def pureint(value):
1283     if not set(value) <= DECIMALS:
1284         raise ValueError("non-pure integer")
1285     return int(value)
1286
1287 def fractions2float(fractions_raw):
1288     pureint(fractions_raw)
1289     return float("0." + fractions_raw)
1290
1291
1292 def get_def_by_path(defines_by_path, sub_decode_path):
1293     """Get define by decode path
1294     """
1295     for path, define in defines_by_path:
1296         if len(path) != len(sub_decode_path):
1297             continue
1298         for p1, p2 in zip(path, sub_decode_path):
1299             if (not p1 is any) and (p1 != p2):
1300                 break
1301         else:
1302             return define
1303
1304
1305 ########################################################################
1306 # Errors
1307 ########################################################################
1308
1309 class ASN1Error(ValueError):
1310     pass
1311
1312
1313 class DecodeError(ASN1Error):
1314     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1315         """
1316         :param str msg: reason of decode failing
1317         :param klass: optional exact DecodeError inherited class (like
1318                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1319                       :py:exc:`InvalidLength`)
1320         :param decode_path: tuple of strings. It contains human
1321                             readable names of the fields through which
1322                             decoding process has passed
1323         :param int offset: binary offset where failure happened
1324         """
1325         super(DecodeError, self).__init__()
1326         self.msg = msg
1327         self.klass = klass
1328         self.decode_path = decode_path
1329         self.offset = offset
1330
1331     def __str__(self):
1332         return " ".join(
1333             c for c in (
1334                 "" if self.klass is None else self.klass.__name__,
1335                 (
1336                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1337                     if len(self.decode_path) > 0 else ""
1338                 ),
1339                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1340                 self.msg,
1341             ) if c != ""
1342         )
1343
1344     def __repr__(self):
1345         return "%s(%s)" % (self.__class__.__name__, self)
1346
1347
1348 class NotEnoughData(DecodeError):
1349     pass
1350
1351
1352 class ExceedingData(ASN1Error):
1353     def __init__(self, nbytes):
1354         super(ExceedingData, self).__init__()
1355         self.nbytes = nbytes
1356
1357     def __str__(self):
1358         return "%d trailing bytes" % self.nbytes
1359
1360     def __repr__(self):
1361         return "%s(%s)" % (self.__class__.__name__, self)
1362
1363
1364 class LenIndefForm(DecodeError):
1365     pass
1366
1367
1368 class TagMismatch(DecodeError):
1369     pass
1370
1371
1372 class InvalidLength(DecodeError):
1373     pass
1374
1375
1376 class InvalidOID(DecodeError):
1377     pass
1378
1379
1380 class ObjUnknown(ASN1Error):
1381     def __init__(self, name):
1382         super(ObjUnknown, self).__init__()
1383         self.name = name
1384
1385     def __str__(self):
1386         return "object is unknown: %s" % self.name
1387
1388     def __repr__(self):
1389         return "%s(%s)" % (self.__class__.__name__, self)
1390
1391
1392 class ObjNotReady(ASN1Error):
1393     def __init__(self, name):
1394         super(ObjNotReady, self).__init__()
1395         self.name = name
1396
1397     def __str__(self):
1398         return "object is not ready: %s" % self.name
1399
1400     def __repr__(self):
1401         return "%s(%s)" % (self.__class__.__name__, self)
1402
1403
1404 class InvalidValueType(ASN1Error):
1405     def __init__(self, expected_types):
1406         super(InvalidValueType, self).__init__()
1407         self.expected_types = expected_types
1408
1409     def __str__(self):
1410         return "invalid value type, expected: %s" % ", ".join(
1411             [repr(t) for t in self.expected_types]
1412         )
1413
1414     def __repr__(self):
1415         return "%s(%s)" % (self.__class__.__name__, self)
1416
1417
1418 class BoundsError(ASN1Error):
1419     def __init__(self, bound_min, value, bound_max):
1420         super(BoundsError, self).__init__()
1421         self.bound_min = bound_min
1422         self.value = value
1423         self.bound_max = bound_max
1424
1425     def __str__(self):
1426         return "unsatisfied bounds: %s <= %s <= %s" % (
1427             self.bound_min,
1428             self.value,
1429             self.bound_max,
1430         )
1431
1432     def __repr__(self):
1433         return "%s(%s)" % (self.__class__.__name__, self)
1434
1435
1436 ########################################################################
1437 # Basic coders
1438 ########################################################################
1439
1440 _hexdecoder = getdecoder("hex")
1441 _hexencoder = getencoder("hex")
1442
1443
1444 def hexdec(data):
1445     """Binary data to hexadecimal string convert
1446     """
1447     return _hexdecoder(data)[0]
1448
1449
1450 def hexenc(data):
1451     """Hexadecimal string to binary data convert
1452     """
1453     return _hexencoder(data)[0].decode("ascii")
1454
1455
1456 def int_bytes_len(num, byte_len=8):
1457     if num == 0:
1458         return 1
1459     return int(ceil(float(num.bit_length()) / byte_len))
1460
1461
1462 def zero_ended_encode(num):
1463     octets = bytearray(int_bytes_len(num, 7))
1464     i = len(octets) - 1
1465     octets[i] = num & 0x7F
1466     num >>= 7
1467     i -= 1
1468     while num > 0:
1469         octets[i] = 0x80 | (num & 0x7F)
1470         num >>= 7
1471         i -= 1
1472     return bytes(octets)
1473
1474
1475 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1476     """Encode tag to binary form
1477
1478     :param int num: tag's number
1479     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1480                       :py:data:`pyderasn.TagClassContext`,
1481                       :py:data:`pyderasn.TagClassApplication`,
1482                       :py:data:`pyderasn.TagClassPrivate`)
1483     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1484                      :py:data:`pyderasn.TagFormConstructed`)
1485     """
1486     if num < 31:
1487         # [XX|X|.....]
1488         return int2byte(klass | form | num)
1489     # [XX|X|11111][1.......][1.......] ... [0.......]
1490     return int2byte(klass | form | 31) + zero_ended_encode(num)
1491
1492
1493 def tag_decode(tag):
1494     """Decode tag from binary form
1495
1496     .. warning::
1497
1498        No validation is performed, assuming that it has already passed.
1499
1500     It returns tuple with three integers, as
1501     :py:func:`pyderasn.tag_encode` accepts.
1502     """
1503     first_octet = byte2int(tag)
1504     klass = first_octet & 0xC0
1505     form = first_octet & 0x20
1506     if first_octet & 0x1F < 0x1F:
1507         return (klass, form, first_octet & 0x1F)
1508     num = 0
1509     for octet in iterbytes(tag[1:]):
1510         num <<= 7
1511         num |= octet & 0x7F
1512     return (klass, form, num)
1513
1514
1515 def tag_ctxp(num):
1516     """Create CONTEXT PRIMITIVE tag
1517     """
1518     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1519
1520
1521 def tag_ctxc(num):
1522     """Create CONTEXT CONSTRUCTED tag
1523     """
1524     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1525
1526
1527 def tag_strip(data):
1528     """Take off tag from the data
1529
1530     :returns: (encoded tag, tag length, remaining data)
1531     """
1532     if len(data) == 0:
1533         raise NotEnoughData("no data at all")
1534     if byte2int(data) & 0x1F < 31:
1535         return data[:1], 1, data[1:]
1536     i = 0
1537     while True:
1538         i += 1
1539         if i == len(data):
1540             raise DecodeError("unfinished tag")
1541         if indexbytes(data, i) & 0x80 == 0:
1542             break
1543     i += 1
1544     return data[:i], i, data[i:]
1545
1546
1547 def len_encode(l):
1548     if l < 0x80:
1549         return int2byte(l)
1550     octets = bytearray(int_bytes_len(l) + 1)
1551     octets[0] = 0x80 | (len(octets) - 1)
1552     for i in six_xrange(len(octets) - 1, 0, -1):
1553         octets[i] = l & 0xFF
1554         l >>= 8
1555     return bytes(octets)
1556
1557
1558 def len_decode(data):
1559     """Decode length
1560
1561     :returns: (decoded length, length's length, remaining data)
1562     :raises LenIndefForm: if indefinite form encoding is met
1563     """
1564     if len(data) == 0:
1565         raise NotEnoughData("no data at all")
1566     first_octet = byte2int(data)
1567     if first_octet & 0x80 == 0:
1568         return first_octet, 1, data[1:]
1569     octets_num = first_octet & 0x7F
1570     if octets_num + 1 > len(data):
1571         raise NotEnoughData("encoded length is longer than data")
1572     if octets_num == 0:
1573         raise LenIndefForm()
1574     if byte2int(data[1:]) == 0:
1575         raise DecodeError("leading zeros")
1576     l = 0
1577     for v in iterbytes(data[1:1 + octets_num]):
1578         l = (l << 8) | v
1579     if l <= 127:
1580         raise DecodeError("long form instead of short one")
1581     return l, 1 + octets_num, data[1 + octets_num:]
1582
1583
1584 LEN0 = len_encode(0)
1585 LEN1 = len_encode(1)
1586 LEN1K = len_encode(1000)
1587
1588
1589 def len_size(l):
1590     """How many bytes length field will take
1591     """
1592     if l < 128:
1593         return 1
1594     if l < 256:  # 1 << 8
1595         return 2
1596     if l < 65536:  # 1 << 16
1597         return 3
1598     if l < 16777216:  # 1 << 24
1599         return 4
1600     if l < 4294967296:  # 1 << 32
1601         return 5
1602     if l < 1099511627776:  # 1 << 40
1603         return 6
1604     if l < 281474976710656:  # 1 << 48
1605         return 7
1606     if l < 72057594037927936:  # 1 << 56
1607         return 8
1608     raise OverflowError("too big length")
1609
1610
1611 def write_full(writer, data):
1612     """Fully write provided data
1613
1614     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1615
1616     BytesIO does not guarantee that the whole data will be written at
1617     once. That function write everything provided, raising an error if
1618     ``writer`` returns None.
1619     """
1620     data = memoryview(data)
1621     written = 0
1622     while written != len(data):
1623         n = writer(data[written:])
1624         if n is None:
1625             raise ValueError("can not write to buf")
1626         written += n
1627
1628
1629 # If it is 64-bit system, then use compact 64-bit array of unsigned
1630 # longs. Use an ordinary list with universal integers otherwise, that
1631 # is slower.
1632 if sys_maxsize > 2 ** 32:
1633     def state_2pass_new():
1634         return array("L")
1635 else:
1636     def state_2pass_new():
1637         return []
1638
1639
1640 ########################################################################
1641 # Base class
1642 ########################################################################
1643
1644 class AutoAddSlots(type):
1645     def __new__(cls, name, bases, _dict):
1646         _dict["__slots__"] = _dict.get("__slots__", ())
1647         return type.__new__(cls, name, bases, _dict)
1648
1649
1650 BasicState = namedtuple("BasicState", (
1651     "version",
1652     "tag",
1653     "tag_order",
1654     "expl",
1655     "default",
1656     "optional",
1657     "offset",
1658     "llen",
1659     "vlen",
1660     "expl_lenindef",
1661     "lenindef",
1662     "ber_encoded",
1663 ), **NAMEDTUPLE_KWARGS)
1664
1665
1666 @add_metaclass(AutoAddSlots)
1667 class Obj(object):
1668     """Common ASN.1 object class
1669
1670     All ASN.1 types are inherited from it. It has metaclass that
1671     automatically adds ``__slots__`` to all inherited classes.
1672     """
1673     __slots__ = (
1674         "tag",
1675         "_tag_order",
1676         "_value",
1677         "_expl",
1678         "default",
1679         "optional",
1680         "offset",
1681         "llen",
1682         "vlen",
1683         "expl_lenindef",
1684         "lenindef",
1685         "ber_encoded",
1686     )
1687
1688     def __init__(
1689             self,
1690             impl=None,
1691             expl=None,
1692             default=None,
1693             optional=False,
1694             _decoded=(0, 0, 0),
1695     ):
1696         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1697         self._expl = getattr(self, "expl", None) if expl is None else expl
1698         if self.tag != self.tag_default and self._expl is not None:
1699             raise ValueError("implicit and explicit tags can not be set simultaneously")
1700         if self.tag is None:
1701             self._tag_order = None
1702         else:
1703             tag_class, _, tag_num = tag_decode(
1704                 self.tag if self._expl is None else self._expl
1705             )
1706             self._tag_order = (tag_class, tag_num)
1707         if default is not None:
1708             optional = True
1709         self.optional = optional
1710         self.offset, self.llen, self.vlen = _decoded
1711         self.default = None
1712         self.expl_lenindef = False
1713         self.lenindef = False
1714         self.ber_encoded = False
1715
1716     @property
1717     def ready(self):  # pragma: no cover
1718         """Is object ready to be encoded?
1719         """
1720         raise NotImplementedError()
1721
1722     def _assert_ready(self):
1723         if not self.ready:
1724             raise ObjNotReady(self.__class__.__name__)
1725
1726     @property
1727     def bered(self):
1728         """Is either object or any elements inside is BER encoded?
1729         """
1730         return self.expl_lenindef or self.lenindef or self.ber_encoded
1731
1732     @property
1733     def decoded(self):
1734         """Is object decoded?
1735         """
1736         return (self.llen + self.vlen) > 0
1737
1738     def __getstate__(self):  # pragma: no cover
1739         """Used for making safe to be mutable pickleable copies
1740         """
1741         raise NotImplementedError()
1742
1743     def __setstate__(self, state):
1744         if state.version != __version__:
1745             raise ValueError("data is pickled by different PyDERASN version")
1746         self.tag = state.tag
1747         self._tag_order = state.tag_order
1748         self._expl = state.expl
1749         self.default = state.default
1750         self.optional = state.optional
1751         self.offset = state.offset
1752         self.llen = state.llen
1753         self.vlen = state.vlen
1754         self.expl_lenindef = state.expl_lenindef
1755         self.lenindef = state.lenindef
1756         self.ber_encoded = state.ber_encoded
1757
1758     @property
1759     def tag_order(self):
1760         """Tag's (class, number) used for DER/CER sorting
1761         """
1762         return self._tag_order
1763
1764     @property
1765     def tag_order_cer(self):
1766         return self.tag_order
1767
1768     @property
1769     def tlen(self):
1770         """.. seealso:: :ref:`decoding`
1771         """
1772         return len(self.tag)
1773
1774     @property
1775     def tlvlen(self):
1776         """.. seealso:: :ref:`decoding`
1777         """
1778         return self.tlen + self.llen + self.vlen
1779
1780     def __str__(self):  # pragma: no cover
1781         return self.__bytes__() if PY2 else self.__unicode__()
1782
1783     def __ne__(self, their):
1784         return not(self == their)
1785
1786     def __gt__(self, their):  # pragma: no cover
1787         return not(self < their)
1788
1789     def __le__(self, their):  # pragma: no cover
1790         return (self == their) or (self < their)
1791
1792     def __ge__(self, their):  # pragma: no cover
1793         return (self == their) or (self > their)
1794
1795     def _encode(self):  # pragma: no cover
1796         raise NotImplementedError()
1797
1798     def _encode_cer(self, writer):
1799         write_full(writer, self._encode())
1800
1801     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1802         yield NotImplemented
1803
1804     def _encode1st(self, state):
1805         raise NotImplementedError()
1806
1807     def _encode2nd(self, writer, state_iter):
1808         raise NotImplementedError()
1809
1810     def encode(self):
1811         """DER encode the structure
1812
1813         :returns: DER representation
1814         """
1815         raw = self._encode()
1816         if self._expl is None:
1817             return raw
1818         return b"".join((self._expl, len_encode(len(raw)), raw))
1819
1820     def encode1st(self, state=None):
1821         """Do the 1st pass of 2-pass encoding
1822
1823         :rtype: (int, array("L"))
1824         :returns: full length of encoded data and precalculated various
1825                   objects lengths
1826         """
1827         if state is None:
1828             state = state_2pass_new()
1829         if self._expl is None:
1830             return self._encode1st(state)
1831         state.append(0)
1832         idx = len(state) - 1
1833         vlen, _ = self._encode1st(state)
1834         state[idx] = vlen
1835         fulllen = len(self._expl) + len_size(vlen) + vlen
1836         return fulllen, state
1837
1838     def encode2nd(self, writer, state_iter):
1839         """Do the 2nd pass of 2-pass encoding
1840
1841         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1842         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1843         """
1844         if self._expl is None:
1845             self._encode2nd(writer, state_iter)
1846         else:
1847             write_full(writer, self._expl + len_encode(next(state_iter)))
1848             self._encode2nd(writer, state_iter)
1849
1850     def encode_cer(self, writer):
1851         """CER encode the structure to specified writer
1852
1853         :param writer: must comply with ``io.RawIOBase.write``
1854                        behaviour. It takes slice to be written and
1855                        returns number of bytes processed. If it returns
1856                        None, then exception will be raised
1857         """
1858         if self._expl is not None:
1859             write_full(writer, self._expl + LENINDEF)
1860         if getattr(self, "der_forced", False):
1861             write_full(writer, self._encode())
1862         else:
1863             self._encode_cer(writer)
1864         if self._expl is not None:
1865             write_full(writer, EOC)
1866
1867     def hexencode(self):
1868         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1869         """
1870         return hexenc(self.encode())
1871
1872     def decode(
1873             self,
1874             data,
1875             offset=0,
1876             leavemm=False,
1877             decode_path=(),
1878             ctx=None,
1879             tag_only=False,
1880             _ctx_immutable=True,
1881     ):
1882         """Decode the data
1883
1884         :param data: either binary or memoryview
1885         :param int offset: initial data's offset
1886         :param bool leavemm: do we need to leave memoryview of remaining
1887                     data as is, or convert it to bytes otherwise
1888         :param decode_path: current decode path (tuples of strings,
1889                             possibly with DecodePathDefBy) with will be
1890                             the root for all underlying objects
1891         :param ctx: optional :ref:`context <ctx>` governing decoding process
1892         :param bool tag_only: decode only the tag, without length and
1893                               contents (used only in Choice and Set
1894                               structures, trying to determine if tag satisfies
1895                               the schema)
1896         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1897                                     before using it?
1898         :returns: (Obj, remaining data)
1899
1900         .. seealso:: :ref:`decoding`
1901         """
1902         result = next(self.decode_evgen(
1903             data,
1904             offset,
1905             leavemm,
1906             decode_path,
1907             ctx,
1908             tag_only,
1909             _ctx_immutable,
1910             _evgen_mode=False,
1911         ))
1912         if result is None:
1913             return None
1914         _, obj, tail = result
1915         return obj, tail
1916
1917     def decode_evgen(
1918             self,
1919             data,
1920             offset=0,
1921             leavemm=False,
1922             decode_path=(),
1923             ctx=None,
1924             tag_only=False,
1925             _ctx_immutable=True,
1926             _evgen_mode=True,
1927     ):
1928         """Decode with evgen mode on
1929
1930         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1931         it returns the generator producing ``(decode_path, obj, tail)``
1932         values.
1933         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1934         """
1935         if ctx is None:
1936             ctx = {}
1937         elif _ctx_immutable:
1938             ctx = copy(ctx)
1939         tlv = memoryview(data)
1940         if (
1941                 _evgen_mode and
1942                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1943         ):
1944             _evgen_mode = False
1945         if self._expl is None:
1946             for result in self._decode(
1947                     tlv,
1948                     offset=offset,
1949                     decode_path=decode_path,
1950                     ctx=ctx,
1951                     tag_only=tag_only,
1952                     evgen_mode=_evgen_mode,
1953             ):
1954                 if tag_only:
1955                     yield None
1956                     return
1957                 _decode_path, obj, tail = result
1958                 if not _decode_path is decode_path:
1959                     yield result
1960         else:
1961             try:
1962                 t, tlen, lv = tag_strip(tlv)
1963             except DecodeError as err:
1964                 raise err.__class__(
1965                     msg=err.msg,
1966                     klass=self.__class__,
1967                     decode_path=decode_path,
1968                     offset=offset,
1969                 )
1970             if t != self._expl:
1971                 raise TagMismatch(
1972                     klass=self.__class__,
1973                     decode_path=decode_path,
1974                     offset=offset,
1975                 )
1976             try:
1977                 l, llen, v = len_decode(lv)
1978             except LenIndefForm as err:
1979                 if not ctx.get("bered", False):
1980                     raise err.__class__(
1981                         msg=err.msg,
1982                         klass=self.__class__,
1983                         decode_path=decode_path,
1984                         offset=offset,
1985                     )
1986                 llen, v = 1, lv[1:]
1987                 offset += tlen + llen
1988                 for result in self._decode(
1989                         v,
1990                         offset=offset,
1991                         decode_path=decode_path,
1992                         ctx=ctx,
1993                         tag_only=tag_only,
1994                         evgen_mode=_evgen_mode,
1995                 ):
1996                     if tag_only:  # pragma: no cover
1997                         yield None
1998                         return
1999                     _decode_path, obj, tail = result
2000                     if not _decode_path is decode_path:
2001                         yield result
2002                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2003                 if eoc_expected.tobytes() != EOC:
2004                     raise DecodeError(
2005                         "no EOC",
2006                         klass=self.__class__,
2007                         decode_path=decode_path,
2008                         offset=offset,
2009                     )
2010                 obj.vlen += EOC_LEN
2011                 obj.expl_lenindef = True
2012             except DecodeError as err:
2013                 raise err.__class__(
2014                     msg=err.msg,
2015                     klass=self.__class__,
2016                     decode_path=decode_path,
2017                     offset=offset,
2018                 )
2019             else:
2020                 if l > len(v):
2021                     raise NotEnoughData(
2022                         "encoded length is longer than data",
2023                         klass=self.__class__,
2024                         decode_path=decode_path,
2025                         offset=offset,
2026                     )
2027                 for result in self._decode(
2028                         v,
2029                         offset=offset + tlen + llen,
2030                         decode_path=decode_path,
2031                         ctx=ctx,
2032                         tag_only=tag_only,
2033                         evgen_mode=_evgen_mode,
2034                 ):
2035                     if tag_only:  # pragma: no cover
2036                         yield None
2037                         return
2038                     _decode_path, obj, tail = result
2039                     if not _decode_path is decode_path:
2040                         yield result
2041                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2042                     raise DecodeError(
2043                         "explicit tag out-of-bound, longer than data",
2044                         klass=self.__class__,
2045                         decode_path=decode_path,
2046                         offset=offset,
2047                     )
2048         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2049
2050     def decod(self, data, offset=0, decode_path=(), ctx=None):
2051         """Decode the data, check that tail is empty
2052
2053         :raises ExceedingData: if tail is not empty
2054
2055         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2056         (decode without tail) that also checks that there is no
2057         trailing data left.
2058         """
2059         obj, tail = self.decode(
2060             data,
2061             offset=offset,
2062             decode_path=decode_path,
2063             ctx=ctx,
2064             leavemm=True,
2065         )
2066         if len(tail) > 0:
2067             raise ExceedingData(len(tail))
2068         return obj
2069
2070     def hexdecode(self, data, *args, **kwargs):
2071         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2072         """
2073         return self.decode(hexdec(data), *args, **kwargs)
2074
2075     def hexdecod(self, data, *args, **kwargs):
2076         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2077         """
2078         return self.decod(hexdec(data), *args, **kwargs)
2079
2080     @property
2081     def expled(self):
2082         """.. seealso:: :ref:`decoding`
2083         """
2084         return self._expl is not None
2085
2086     @property
2087     def expl_tag(self):
2088         """.. seealso:: :ref:`decoding`
2089         """
2090         return self._expl
2091
2092     @property
2093     def expl_tlen(self):
2094         """.. seealso:: :ref:`decoding`
2095         """
2096         return len(self._expl)
2097
2098     @property
2099     def expl_llen(self):
2100         """.. seealso:: :ref:`decoding`
2101         """
2102         if self.expl_lenindef:
2103             return 1
2104         return len(len_encode(self.tlvlen))
2105
2106     @property
2107     def expl_offset(self):
2108         """.. seealso:: :ref:`decoding`
2109         """
2110         return self.offset - self.expl_tlen - self.expl_llen
2111
2112     @property
2113     def expl_vlen(self):
2114         """.. seealso:: :ref:`decoding`
2115         """
2116         return self.tlvlen
2117
2118     @property
2119     def expl_tlvlen(self):
2120         """.. seealso:: :ref:`decoding`
2121         """
2122         return self.expl_tlen + self.expl_llen + self.expl_vlen
2123
2124     @property
2125     def fulloffset(self):
2126         """.. seealso:: :ref:`decoding`
2127         """
2128         return self.expl_offset if self.expled else self.offset
2129
2130     @property
2131     def fulllen(self):
2132         """.. seealso:: :ref:`decoding`
2133         """
2134         return self.expl_tlvlen if self.expled else self.tlvlen
2135
2136     def pps_lenindef(self, decode_path):
2137         if self.lenindef and not (
2138                 getattr(self, "defined", None) is not None and
2139                 self.defined[1].lenindef
2140         ):
2141             yield _pp(
2142                 asn1_type_name="EOC",
2143                 obj_name="",
2144                 decode_path=decode_path,
2145                 offset=(
2146                     self.offset + self.tlvlen -
2147                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2148                 ),
2149                 tlen=1,
2150                 llen=1,
2151                 vlen=0,
2152                 ber_encoded=True,
2153                 bered=True,
2154             )
2155         if self.expl_lenindef:
2156             yield _pp(
2157                 asn1_type_name="EOC",
2158                 obj_name="EXPLICIT",
2159                 decode_path=decode_path,
2160                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2161                 tlen=1,
2162                 llen=1,
2163                 vlen=0,
2164                 ber_encoded=True,
2165                 bered=True,
2166             )
2167
2168
2169 def encode_cer(obj):
2170     """Encode to CER in memory buffer
2171
2172     :returns bytes: memory buffer contents
2173     """
2174     buf = BytesIO()
2175     obj.encode_cer(buf.write)
2176     return buf.getvalue()
2177
2178
2179 def encode2pass(obj):
2180     """Encode (2-pass mode) to DER in memory buffer
2181
2182     :returns bytes: memory buffer contents
2183     """
2184     buf = BytesIO()
2185     _, state = obj.encode1st()
2186     obj.encode2nd(buf.write, iter(state))
2187     return buf.getvalue()
2188
2189
2190 class DecodePathDefBy(object):
2191     """DEFINED BY representation inside decode path
2192     """
2193     __slots__ = ("defined_by",)
2194
2195     def __init__(self, defined_by):
2196         self.defined_by = defined_by
2197
2198     def __ne__(self, their):
2199         return not(self == their)
2200
2201     def __eq__(self, their):
2202         if not isinstance(their, self.__class__):
2203             return False
2204         return self.defined_by == their.defined_by
2205
2206     def __str__(self):
2207         return "DEFINED BY " + str(self.defined_by)
2208
2209     def __repr__(self):
2210         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2211
2212
2213 ########################################################################
2214 # Pretty printing
2215 ########################################################################
2216
2217 PP = namedtuple("PP", (
2218     "obj",
2219     "asn1_type_name",
2220     "obj_name",
2221     "decode_path",
2222     "value",
2223     "blob",
2224     "optional",
2225     "default",
2226     "impl",
2227     "expl",
2228     "offset",
2229     "tlen",
2230     "llen",
2231     "vlen",
2232     "expl_offset",
2233     "expl_tlen",
2234     "expl_llen",
2235     "expl_vlen",
2236     "expl_lenindef",
2237     "lenindef",
2238     "ber_encoded",
2239     "bered",
2240 ), **NAMEDTUPLE_KWARGS)
2241
2242
2243 def _pp(
2244         obj=None,
2245         asn1_type_name="unknown",
2246         obj_name="unknown",
2247         decode_path=(),
2248         value=None,
2249         blob=None,
2250         optional=False,
2251         default=False,
2252         impl=None,
2253         expl=None,
2254         offset=0,
2255         tlen=0,
2256         llen=0,
2257         vlen=0,
2258         expl_offset=None,
2259         expl_tlen=None,
2260         expl_llen=None,
2261         expl_vlen=None,
2262         expl_lenindef=False,
2263         lenindef=False,
2264         ber_encoded=False,
2265         bered=False,
2266 ):
2267     return PP(
2268         obj,
2269         asn1_type_name,
2270         obj_name,
2271         decode_path,
2272         value,
2273         blob,
2274         optional,
2275         default,
2276         impl,
2277         expl,
2278         offset,
2279         tlen,
2280         llen,
2281         vlen,
2282         expl_offset,
2283         expl_tlen,
2284         expl_llen,
2285         expl_vlen,
2286         expl_lenindef,
2287         lenindef,
2288         ber_encoded,
2289         bered,
2290     )
2291
2292
2293 def _colourize(what, colour, with_colours, attrs=("bold",)):
2294     return colored(what, colour, attrs=attrs) if with_colours else what
2295
2296
2297 def colonize_hex(hexed):
2298     """Separate hexadecimal string with colons
2299     """
2300     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2301
2302
2303 def pp_console_row(
2304         pp,
2305         oid_maps=(),
2306         with_offsets=False,
2307         with_blob=True,
2308         with_colours=False,
2309         with_decode_path=False,
2310         decode_path_len_decrease=0,
2311 ):
2312     cols = []
2313     if with_offsets:
2314         col = "%5d%s%s" % (
2315             pp.offset,
2316             (
2317                 "  " if pp.expl_offset is None else
2318                 ("-%d" % (pp.offset - pp.expl_offset))
2319             ),
2320             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2321         )
2322         col = _colourize(col, "red", with_colours, ())
2323         col += _colourize("B", "red", with_colours) if pp.bered else " "
2324         cols.append(col)
2325         col = "[%d,%d,%4d]%s" % (
2326             pp.tlen,
2327             pp.llen,
2328             pp.vlen,
2329             LENINDEF_PP_CHAR if pp.lenindef else " "
2330         )
2331         col = _colourize(col, "green", with_colours, ())
2332         cols.append(col)
2333     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2334     if decode_path_len > 0:
2335         cols.append(" ." * decode_path_len)
2336         ent = pp.decode_path[-1]
2337         if isinstance(ent, DecodePathDefBy):
2338             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2339             value = str(ent.defined_by)
2340             oid_name = None
2341             if (
2342                     len(oid_maps) > 0 and
2343                     ent.defined_by.asn1_type_name ==
2344                     ObjectIdentifier.asn1_type_name
2345             ):
2346                 for oid_map in oid_maps:
2347                     oid_name = oid_map.get(value)
2348                     if oid_name is not None:
2349                         cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2350                         break
2351             if oid_name is None:
2352                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2353         else:
2354             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2355     if pp.expl is not None:
2356         klass, _, num = pp.expl
2357         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2358         cols.append(_colourize(col, "blue", with_colours))
2359     if pp.impl is not None:
2360         klass, _, num = pp.impl
2361         col = "[%s%d]" % (TagClassReprs[klass], num)
2362         cols.append(_colourize(col, "blue", with_colours))
2363     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2364         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2365     if pp.ber_encoded:
2366         cols.append(_colourize("BER", "red", with_colours))
2367     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2368     if pp.value is not None:
2369         value = pp.value
2370         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2371         if (
2372                 len(oid_maps) > 0 and
2373                 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2374         ):
2375             for oid_map in oid_maps:
2376                 oid_name = oid_map.get(value)
2377                 if oid_name is not None:
2378                     cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2379                     break
2380         if pp.asn1_type_name == Integer.asn1_type_name:
2381             cols.append(_colourize(
2382                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2383             ))
2384     if with_blob:
2385         if pp.blob.__class__ == binary_type:
2386             cols.append(hexenc(pp.blob))
2387         elif pp.blob.__class__ == tuple:
2388             cols.append(", ".join(pp.blob))
2389     if pp.optional:
2390         cols.append(_colourize("OPTIONAL", "red", with_colours))
2391     if pp.default:
2392         cols.append(_colourize("DEFAULT", "red", with_colours))
2393     if with_decode_path:
2394         cols.append(_colourize(
2395             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2396             "grey",
2397             with_colours,
2398         ))
2399     return " ".join(cols)
2400
2401
2402 def pp_console_blob(pp, decode_path_len_decrease=0):
2403     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2404     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2405     if decode_path_len > 0:
2406         cols.append(" ." * (decode_path_len + 1))
2407     if pp.blob.__class__ == binary_type:
2408         blob = hexenc(pp.blob).upper()
2409         for i in six_xrange(0, len(blob), 32):
2410             chunk = blob[i:i + 32]
2411             yield " ".join(cols + [colonize_hex(chunk)])
2412     elif pp.blob.__class__ == tuple:
2413         yield " ".join(cols + [", ".join(pp.blob)])
2414
2415
2416 def pprint(
2417         obj,
2418         oid_maps=(),
2419         big_blobs=False,
2420         with_colours=False,
2421         with_decode_path=False,
2422         decode_path_only=(),
2423         decode_path=(),
2424 ):
2425     """Pretty print object
2426
2427     :param Obj obj: object you want to pretty print
2428     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2429                      Its human readable form is printed when OID is met
2430     :param big_blobs: if large binary objects are met (like OctetString
2431                       values), do we need to print them too, on separate
2432                       lines
2433     :param with_colours: colourize output, if ``termcolor`` library
2434                          is available
2435     :param with_decode_path: print decode path
2436     :param decode_path_only: print only that specified decode path
2437     """
2438     def _pprint_pps(pps):
2439         for pp in pps:
2440             if hasattr(pp, "_fields"):
2441                 if (
2442                         decode_path_only != () and
2443                         tuple(
2444                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2445                         ) != decode_path_only
2446                 ):
2447                     continue
2448                 if big_blobs:
2449                     yield pp_console_row(
2450                         pp,
2451                         oid_maps=oid_maps,
2452                         with_offsets=True,
2453                         with_blob=False,
2454                         with_colours=with_colours,
2455                         with_decode_path=with_decode_path,
2456                         decode_path_len_decrease=len(decode_path_only),
2457                     )
2458                     for row in pp_console_blob(
2459                             pp,
2460                             decode_path_len_decrease=len(decode_path_only),
2461                     ):
2462                         yield row
2463                 else:
2464                     yield pp_console_row(
2465                         pp,
2466                         oid_maps=oid_maps,
2467                         with_offsets=True,
2468                         with_blob=True,
2469                         with_colours=with_colours,
2470                         with_decode_path=with_decode_path,
2471                         decode_path_len_decrease=len(decode_path_only),
2472                     )
2473             else:
2474                 for row in _pprint_pps(pp):
2475                     yield row
2476     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2477
2478
2479 ########################################################################
2480 # ASN.1 primitive types
2481 ########################################################################
2482
2483 BooleanState = namedtuple(
2484     "BooleanState",
2485     BasicState._fields + ("value",),
2486     **NAMEDTUPLE_KWARGS
2487 )
2488
2489
2490 class Boolean(Obj):
2491     """``BOOLEAN`` boolean type
2492
2493     >>> b = Boolean(True)
2494     BOOLEAN True
2495     >>> b == Boolean(True)
2496     True
2497     >>> bool(b)
2498     True
2499     """
2500     __slots__ = ()
2501     tag_default = tag_encode(1)
2502     asn1_type_name = "BOOLEAN"
2503
2504     def __init__(
2505             self,
2506             value=None,
2507             impl=None,
2508             expl=None,
2509             default=None,
2510             optional=False,
2511             _decoded=(0, 0, 0),
2512     ):
2513         """
2514         :param value: set the value. Either boolean type, or
2515                       :py:class:`pyderasn.Boolean` object
2516         :param bytes impl: override default tag with ``IMPLICIT`` one
2517         :param bytes expl: override default tag with ``EXPLICIT`` one
2518         :param default: set default value. Type same as in ``value``
2519         :param bool optional: is object ``OPTIONAL`` in sequence
2520         """
2521         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2522         self._value = None if value is None else self._value_sanitize(value)
2523         if default is not None:
2524             default = self._value_sanitize(default)
2525             self.default = self.__class__(
2526                 value=default,
2527                 impl=self.tag,
2528                 expl=self._expl,
2529             )
2530             if value is None:
2531                 self._value = default
2532
2533     def _value_sanitize(self, value):
2534         if value.__class__ == bool:
2535             return value
2536         if issubclass(value.__class__, Boolean):
2537             return value._value
2538         raise InvalidValueType((self.__class__, bool))
2539
2540     @property
2541     def ready(self):
2542         return self._value is not None
2543
2544     def __getstate__(self):
2545         return BooleanState(
2546             __version__,
2547             self.tag,
2548             self._tag_order,
2549             self._expl,
2550             self.default,
2551             self.optional,
2552             self.offset,
2553             self.llen,
2554             self.vlen,
2555             self.expl_lenindef,
2556             self.lenindef,
2557             self.ber_encoded,
2558             self._value,
2559         )
2560
2561     def __setstate__(self, state):
2562         super(Boolean, self).__setstate__(state)
2563         self._value = state.value
2564
2565     def __nonzero__(self):
2566         self._assert_ready()
2567         return self._value
2568
2569     def __bool__(self):
2570         self._assert_ready()
2571         return self._value
2572
2573     def __eq__(self, their):
2574         if their.__class__ == bool:
2575             return self._value == their
2576         if not issubclass(their.__class__, Boolean):
2577             return False
2578         return (
2579             self._value == their._value and
2580             self.tag == their.tag and
2581             self._expl == their._expl
2582         )
2583
2584     def __call__(
2585             self,
2586             value=None,
2587             impl=None,
2588             expl=None,
2589             default=None,
2590             optional=None,
2591     ):
2592         return self.__class__(
2593             value=value,
2594             impl=self.tag if impl is None else impl,
2595             expl=self._expl if expl is None else expl,
2596             default=self.default if default is None else default,
2597             optional=self.optional if optional is None else optional,
2598         )
2599
2600     def _encode(self):
2601         self._assert_ready()
2602         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2603
2604     def _encode1st(self, state):
2605         return len(self.tag) + 2, state
2606
2607     def _encode2nd(self, writer, state_iter):
2608         self._assert_ready()
2609         write_full(writer, self._encode())
2610
2611     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2612         try:
2613             t, _, lv = tag_strip(tlv)
2614         except DecodeError as err:
2615             raise err.__class__(
2616                 msg=err.msg,
2617                 klass=self.__class__,
2618                 decode_path=decode_path,
2619                 offset=offset,
2620             )
2621         if t != self.tag:
2622             raise TagMismatch(
2623                 klass=self.__class__,
2624                 decode_path=decode_path,
2625                 offset=offset,
2626             )
2627         if tag_only:
2628             yield None
2629             return
2630         try:
2631             l, _, v = len_decode(lv)
2632         except DecodeError as err:
2633             raise err.__class__(
2634                 msg=err.msg,
2635                 klass=self.__class__,
2636                 decode_path=decode_path,
2637                 offset=offset,
2638             )
2639         if l != 1:
2640             raise InvalidLength(
2641                 "Boolean's length must be equal to 1",
2642                 klass=self.__class__,
2643                 decode_path=decode_path,
2644                 offset=offset,
2645             )
2646         if l > len(v):
2647             raise NotEnoughData(
2648                 "encoded length is longer than data",
2649                 klass=self.__class__,
2650                 decode_path=decode_path,
2651                 offset=offset,
2652             )
2653         first_octet = byte2int(v)
2654         ber_encoded = False
2655         if first_octet == 0:
2656             value = False
2657         elif first_octet == 0xFF:
2658             value = True
2659         elif ctx.get("bered", False):
2660             value = True
2661             ber_encoded = True
2662         else:
2663             raise DecodeError(
2664                 "unacceptable Boolean value",
2665                 klass=self.__class__,
2666                 decode_path=decode_path,
2667                 offset=offset,
2668             )
2669         obj = self.__class__(
2670             value=value,
2671             impl=self.tag,
2672             expl=self._expl,
2673             default=self.default,
2674             optional=self.optional,
2675             _decoded=(offset, 1, 1),
2676         )
2677         obj.ber_encoded = ber_encoded
2678         yield decode_path, obj, v[1:]
2679
2680     def __repr__(self):
2681         return pp_console_row(next(self.pps()))
2682
2683     def pps(self, decode_path=()):
2684         yield _pp(
2685             obj=self,
2686             asn1_type_name=self.asn1_type_name,
2687             obj_name=self.__class__.__name__,
2688             decode_path=decode_path,
2689             value=str(self._value) if self.ready else None,
2690             optional=self.optional,
2691             default=self == self.default,
2692             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2693             expl=None if self._expl is None else tag_decode(self._expl),
2694             offset=self.offset,
2695             tlen=self.tlen,
2696             llen=self.llen,
2697             vlen=self.vlen,
2698             expl_offset=self.expl_offset if self.expled else None,
2699             expl_tlen=self.expl_tlen if self.expled else None,
2700             expl_llen=self.expl_llen if self.expled else None,
2701             expl_vlen=self.expl_vlen if self.expled else None,
2702             expl_lenindef=self.expl_lenindef,
2703             ber_encoded=self.ber_encoded,
2704             bered=self.bered,
2705         )
2706         for pp in self.pps_lenindef(decode_path):
2707             yield pp
2708
2709
2710 IntegerState = namedtuple(
2711     "IntegerState",
2712     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2713     **NAMEDTUPLE_KWARGS
2714 )
2715
2716
2717 class Integer(Obj):
2718     """``INTEGER`` integer type
2719
2720     >>> b = Integer(-123)
2721     INTEGER -123
2722     >>> b == Integer(-123)
2723     True
2724     >>> int(b)
2725     -123
2726
2727     >>> Integer(2, bounds=(1, 3))
2728     INTEGER 2
2729     >>> Integer(5, bounds=(1, 3))
2730     Traceback (most recent call last):
2731     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2732
2733     ::
2734
2735         class Version(Integer):
2736             schema = (
2737                 ("v1", 0),
2738                 ("v2", 1),
2739                 ("v3", 2),
2740             )
2741
2742     >>> v = Version("v1")
2743     Version INTEGER v1
2744     >>> int(v)
2745     0
2746     >>> v.named
2747     'v1'
2748     >>> v.specs
2749     {'v3': 2, 'v1': 0, 'v2': 1}
2750     """
2751     __slots__ = ("specs", "_bound_min", "_bound_max")
2752     tag_default = tag_encode(2)
2753     asn1_type_name = "INTEGER"
2754
2755     def __init__(
2756             self,
2757             value=None,
2758             bounds=None,
2759             impl=None,
2760             expl=None,
2761             default=None,
2762             optional=False,
2763             _specs=None,
2764             _decoded=(0, 0, 0),
2765     ):
2766         """
2767         :param value: set the value. Either integer type, named value
2768                       (if ``schema`` is specified in the class), or
2769                       :py:class:`pyderasn.Integer` object
2770         :param bounds: set ``(MIN, MAX)`` value constraint.
2771                        (-inf, +inf) by default
2772         :param bytes impl: override default tag with ``IMPLICIT`` one
2773         :param bytes expl: override default tag with ``EXPLICIT`` one
2774         :param default: set default value. Type same as in ``value``
2775         :param bool optional: is object ``OPTIONAL`` in sequence
2776         """
2777         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2778         self._value = value
2779         specs = getattr(self, "schema", {}) if _specs is None else _specs
2780         self.specs = specs if specs.__class__ == dict else dict(specs)
2781         self._bound_min, self._bound_max = getattr(
2782             self,
2783             "bounds",
2784             (float("-inf"), float("+inf")),
2785         ) if bounds is None else bounds
2786         if value is not None:
2787             self._value = self._value_sanitize(value)
2788         if default is not None:
2789             default = self._value_sanitize(default)
2790             self.default = self.__class__(
2791                 value=default,
2792                 impl=self.tag,
2793                 expl=self._expl,
2794                 _specs=self.specs,
2795             )
2796             if self._value is None:
2797                 self._value = default
2798
2799     def _value_sanitize(self, value):
2800         if isinstance(value, integer_types):
2801             pass
2802         elif issubclass(value.__class__, Integer):
2803             value = value._value
2804         elif value.__class__ == str:
2805             value = self.specs.get(value)
2806             if value is None:
2807                 raise ObjUnknown("integer value: %s" % value)
2808         else:
2809             raise InvalidValueType((self.__class__, int, str))
2810         if not self._bound_min <= value <= self._bound_max:
2811             raise BoundsError(self._bound_min, value, self._bound_max)
2812         return value
2813
2814     @property
2815     def ready(self):
2816         return self._value is not None
2817
2818     def __getstate__(self):
2819         return IntegerState(
2820             __version__,
2821             self.tag,
2822             self._tag_order,
2823             self._expl,
2824             self.default,
2825             self.optional,
2826             self.offset,
2827             self.llen,
2828             self.vlen,
2829             self.expl_lenindef,
2830             self.lenindef,
2831             self.ber_encoded,
2832             self.specs,
2833             self._value,
2834             self._bound_min,
2835             self._bound_max,
2836         )
2837
2838     def __setstate__(self, state):
2839         super(Integer, self).__setstate__(state)
2840         self.specs = state.specs
2841         self._value = state.value
2842         self._bound_min = state.bound_min
2843         self._bound_max = state.bound_max
2844
2845     def __int__(self):
2846         self._assert_ready()
2847         return int(self._value)
2848
2849     def tohex(self):
2850         """Hexadecimal representation
2851
2852         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2853         """
2854         hex_repr = hex(int(self))[2:].upper()
2855         if len(hex_repr) % 2 != 0:
2856             hex_repr = "0" + hex_repr
2857         return hex_repr
2858
2859     def __hash__(self):
2860         self._assert_ready()
2861         return hash(b"".join((
2862             self.tag,
2863             bytes(self._expl or b""),
2864             str(self._value).encode("ascii"),
2865         )))
2866
2867     def __eq__(self, their):
2868         if isinstance(their, integer_types):
2869             return self._value == their
2870         if not issubclass(their.__class__, Integer):
2871             return False
2872         return (
2873             self._value == their._value and
2874             self.tag == their.tag and
2875             self._expl == their._expl
2876         )
2877
2878     def __lt__(self, their):
2879         return self._value < their._value
2880
2881     @property
2882     def named(self):
2883         """Return named representation (if exists) of the value
2884         """
2885         for name, value in iteritems(self.specs):
2886             if value == self._value:
2887                 return name
2888         return None
2889
2890     def __call__(
2891             self,
2892             value=None,
2893             bounds=None,
2894             impl=None,
2895             expl=None,
2896             default=None,
2897             optional=None,
2898     ):
2899         return self.__class__(
2900             value=value,
2901             bounds=(
2902                 (self._bound_min, self._bound_max)
2903                 if bounds is None else bounds
2904             ),
2905             impl=self.tag if impl is None else impl,
2906             expl=self._expl if expl is None else expl,
2907             default=self.default if default is None else default,
2908             optional=self.optional if optional is None else optional,
2909             _specs=self.specs,
2910         )
2911
2912     def _encode_payload(self):
2913         self._assert_ready()
2914         value = self._value
2915         if PY2:
2916             if value == 0:
2917                 octets = bytearray([0])
2918             elif value < 0:
2919                 value = -value
2920                 value -= 1
2921                 octets = bytearray()
2922                 while value > 0:
2923                     octets.append((value & 0xFF) ^ 0xFF)
2924                     value >>= 8
2925                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2926                     octets.append(0xFF)
2927             else:
2928                 octets = bytearray()
2929                 while value > 0:
2930                     octets.append(value & 0xFF)
2931                     value >>= 8
2932                 if octets[-1] & 0x80 > 0:
2933                     octets.append(0x00)
2934             octets.reverse()
2935             octets = bytes(octets)
2936         else:
2937             bytes_len = ceil(value.bit_length() / 8) or 1
2938             while True:
2939                 try:
2940                     octets = value.to_bytes(
2941                         bytes_len,
2942                         byteorder="big",
2943                         signed=True,
2944                     )
2945                 except OverflowError:
2946                     bytes_len += 1
2947                 else:
2948                     break
2949         return octets
2950
2951     def _encode(self):
2952         octets = self._encode_payload()
2953         return b"".join((self.tag, len_encode(len(octets)), octets))
2954
2955     def _encode1st(self, state):
2956         l = len(self._encode_payload())
2957         return len(self.tag) + len_size(l) + l, state
2958
2959     def _encode2nd(self, writer, state_iter):
2960         write_full(writer, self._encode())
2961
2962     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2963         try:
2964             t, _, lv = tag_strip(tlv)
2965         except DecodeError as err:
2966             raise err.__class__(
2967                 msg=err.msg,
2968                 klass=self.__class__,
2969                 decode_path=decode_path,
2970                 offset=offset,
2971             )
2972         if t != self.tag:
2973             raise TagMismatch(
2974                 klass=self.__class__,
2975                 decode_path=decode_path,
2976                 offset=offset,
2977             )
2978         if tag_only:
2979             yield None
2980             return
2981         try:
2982             l, llen, v = len_decode(lv)
2983         except DecodeError as err:
2984             raise err.__class__(
2985                 msg=err.msg,
2986                 klass=self.__class__,
2987                 decode_path=decode_path,
2988                 offset=offset,
2989             )
2990         if l > len(v):
2991             raise NotEnoughData(
2992                 "encoded length is longer than data",
2993                 klass=self.__class__,
2994                 decode_path=decode_path,
2995                 offset=offset,
2996             )
2997         if l == 0:
2998             raise NotEnoughData(
2999                 "zero length",
3000                 klass=self.__class__,
3001                 decode_path=decode_path,
3002                 offset=offset,
3003             )
3004         v, tail = v[:l], v[l:]
3005         first_octet = byte2int(v)
3006         if l > 1:
3007             second_octet = byte2int(v[1:])
3008             if (
3009                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3010                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3011             ):
3012                 raise DecodeError(
3013                     "non normalized integer",
3014                     klass=self.__class__,
3015                     decode_path=decode_path,
3016                     offset=offset,
3017                 )
3018         if PY2:
3019             value = 0
3020             if first_octet & 0x80 > 0:
3021                 octets = bytearray()
3022                 for octet in bytearray(v):
3023                     octets.append(octet ^ 0xFF)
3024                 for octet in octets:
3025                     value = (value << 8) | octet
3026                 value += 1
3027                 value = -value
3028             else:
3029                 for octet in bytearray(v):
3030                     value = (value << 8) | octet
3031         else:
3032             value = int.from_bytes(v, byteorder="big", signed=True)
3033         try:
3034             obj = self.__class__(
3035                 value=value,
3036                 bounds=(self._bound_min, self._bound_max),
3037                 impl=self.tag,
3038                 expl=self._expl,
3039                 default=self.default,
3040                 optional=self.optional,
3041                 _specs=self.specs,
3042                 _decoded=(offset, llen, l),
3043             )
3044         except BoundsError as err:
3045             raise DecodeError(
3046                 msg=str(err),
3047                 klass=self.__class__,
3048                 decode_path=decode_path,
3049                 offset=offset,
3050             )
3051         yield decode_path, obj, tail
3052
3053     def __repr__(self):
3054         return pp_console_row(next(self.pps()))
3055
3056     def pps(self, decode_path=()):
3057         yield _pp(
3058             obj=self,
3059             asn1_type_name=self.asn1_type_name,
3060             obj_name=self.__class__.__name__,
3061             decode_path=decode_path,
3062             value=(self.named or str(self._value)) if self.ready else None,
3063             optional=self.optional,
3064             default=self == self.default,
3065             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3066             expl=None if self._expl is None else tag_decode(self._expl),
3067             offset=self.offset,
3068             tlen=self.tlen,
3069             llen=self.llen,
3070             vlen=self.vlen,
3071             expl_offset=self.expl_offset if self.expled else None,
3072             expl_tlen=self.expl_tlen if self.expled else None,
3073             expl_llen=self.expl_llen if self.expled else None,
3074             expl_vlen=self.expl_vlen if self.expled else None,
3075             expl_lenindef=self.expl_lenindef,
3076             bered=self.bered,
3077         )
3078         for pp in self.pps_lenindef(decode_path):
3079             yield pp
3080
3081
3082 BitStringState = namedtuple(
3083     "BitStringState",
3084     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3085     **NAMEDTUPLE_KWARGS
3086 )
3087
3088
3089 class BitString(Obj):
3090     """``BIT STRING`` bit string type
3091
3092     >>> BitString(b"hello world")
3093     BIT STRING 88 bits 68656c6c6f20776f726c64
3094     >>> bytes(b)
3095     b'hello world'
3096     >>> b == b"hello world"
3097     True
3098     >>> b.bit_len
3099     88
3100
3101     >>> BitString("'0A3B5F291CD'H")
3102     BIT STRING 44 bits 0a3b5f291cd0
3103     >>> b = BitString("'010110000000'B")
3104     BIT STRING 12 bits 5800
3105     >>> b.bit_len
3106     12
3107     >>> b[0], b[1], b[2], b[3]
3108     (False, True, False, True)
3109     >>> b[1000]
3110     False
3111     >>> [v for v in b]
3112     [False, True, False, True, True, False, False, False, False, False, False, False]
3113
3114     ::
3115
3116         class KeyUsage(BitString):
3117             schema = (
3118                 ("digitalSignature", 0),
3119                 ("nonRepudiation", 1),
3120                 ("keyEncipherment", 2),
3121             )
3122
3123     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3124     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3125     >>> b.named
3126     ['nonRepudiation', 'keyEncipherment']
3127     >>> b.specs
3128     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3129
3130     .. note::
3131
3132        Pay attention that BIT STRING can be encoded both in primitive
3133        and constructed forms. Decoder always checks constructed form tag
3134        additionally to specified primitive one. If BER decoding is
3135        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3136        of DER restrictions.
3137     """
3138     __slots__ = ("tag_constructed", "specs", "defined")
3139     tag_default = tag_encode(3)
3140     asn1_type_name = "BIT STRING"
3141
3142     def __init__(
3143             self,
3144             value=None,
3145             impl=None,
3146             expl=None,
3147             default=None,
3148             optional=False,
3149             _specs=None,
3150             _decoded=(0, 0, 0),
3151     ):
3152         """
3153         :param value: set the value. Either binary type, tuple of named
3154                       values (if ``schema`` is specified in the class),
3155                       string in ``'XXX...'B`` form, or
3156                       :py:class:`pyderasn.BitString` object
3157         :param bytes impl: override default tag with ``IMPLICIT`` one
3158         :param bytes expl: override default tag with ``EXPLICIT`` one
3159         :param default: set default value. Type same as in ``value``
3160         :param bool optional: is object ``OPTIONAL`` in sequence
3161         """
3162         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3163         specs = getattr(self, "schema", {}) if _specs is None else _specs
3164         self.specs = specs if specs.__class__ == dict else dict(specs)
3165         self._value = None if value is None else self._value_sanitize(value)
3166         if default is not None:
3167             default = self._value_sanitize(default)
3168             self.default = self.__class__(
3169                 value=default,
3170                 impl=self.tag,
3171                 expl=self._expl,
3172             )
3173             if value is None:
3174                 self._value = default
3175         self.defined = None
3176         tag_klass, _, tag_num = tag_decode(self.tag)
3177         self.tag_constructed = tag_encode(
3178             klass=tag_klass,
3179             form=TagFormConstructed,
3180             num=tag_num,
3181         )
3182
3183     def _bits2octets(self, bits):
3184         if len(self.specs) > 0:
3185             bits = bits.rstrip("0")
3186         bit_len = len(bits)
3187         bits += "0" * ((8 - (bit_len % 8)) % 8)
3188         octets = bytearray(len(bits) // 8)
3189         for i in six_xrange(len(octets)):
3190             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3191         return bit_len, bytes(octets)
3192
3193     def _value_sanitize(self, value):
3194         if isinstance(value, (string_types, binary_type)):
3195             if (
3196                     isinstance(value, string_types) and
3197                     value.startswith("'")
3198             ):
3199                 if value.endswith("'B"):
3200                     value = value[1:-2]
3201                     if not frozenset(value) <= SET01:
3202                         raise ValueError("B's coding contains unacceptable chars")
3203                     return self._bits2octets(value)
3204                 if value.endswith("'H"):
3205                     value = value[1:-2]
3206                     return (
3207                         len(value) * 4,
3208                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3209                     )
3210             if value.__class__ == binary_type:
3211                 return (len(value) * 8, value)
3212             raise InvalidValueType((self.__class__, string_types, binary_type))
3213         if value.__class__ == tuple:
3214             if (
3215                     len(value) == 2 and
3216                     isinstance(value[0], integer_types) and
3217                     value[1].__class__ == binary_type
3218             ):
3219                 return value
3220             bits = []
3221             for name in value:
3222                 bit = self.specs.get(name)
3223                 if bit is None:
3224                     raise ObjUnknown("BitString value: %s" % name)
3225                 bits.append(bit)
3226             if len(bits) == 0:
3227                 return self._bits2octets("")
3228             bits = frozenset(bits)
3229             return self._bits2octets("".join(
3230                 ("1" if bit in bits else "0")
3231                 for bit in six_xrange(max(bits) + 1)
3232             ))
3233         if issubclass(value.__class__, BitString):
3234             return value._value
3235         raise InvalidValueType((self.__class__, binary_type, string_types))
3236
3237     @property
3238     def ready(self):
3239         return self._value is not None
3240
3241     def __getstate__(self):
3242         return BitStringState(
3243             __version__,
3244             self.tag,
3245             self._tag_order,
3246             self._expl,
3247             self.default,
3248             self.optional,
3249             self.offset,
3250             self.llen,
3251             self.vlen,
3252             self.expl_lenindef,
3253             self.lenindef,
3254             self.ber_encoded,
3255             self.specs,
3256             self._value,
3257             self.tag_constructed,
3258             self.defined,
3259         )
3260
3261     def __setstate__(self, state):
3262         super(BitString, self).__setstate__(state)
3263         self.specs = state.specs
3264         self._value = state.value
3265         self.tag_constructed = state.tag_constructed
3266         self.defined = state.defined
3267
3268     def __iter__(self):
3269         self._assert_ready()
3270         for i in six_xrange(self._value[0]):
3271             yield self[i]
3272
3273     @property
3274     def bit_len(self):
3275         """Returns number of bits in the string
3276         """
3277         self._assert_ready()
3278         return self._value[0]
3279
3280     def __bytes__(self):
3281         self._assert_ready()
3282         return self._value[1]
3283
3284     def __eq__(self, their):
3285         if their.__class__ == bytes:
3286             return self._value[1] == their
3287         if not issubclass(their.__class__, BitString):
3288             return False
3289         return (
3290             self._value == their._value and
3291             self.tag == their.tag and
3292             self._expl == their._expl
3293         )
3294
3295     @property
3296     def named(self):
3297         """Named representation (if exists) of the bits
3298
3299         :returns: [str(name), ...]
3300         """
3301         return [name for name, bit in iteritems(self.specs) if self[bit]]
3302
3303     def __call__(
3304             self,
3305             value=None,
3306             impl=None,
3307             expl=None,
3308             default=None,
3309             optional=None,
3310     ):
3311         return self.__class__(
3312             value=value,
3313             impl=self.tag if impl is None else impl,
3314             expl=self._expl if expl is None else expl,
3315             default=self.default if default is None else default,
3316             optional=self.optional if optional is None else optional,
3317             _specs=self.specs,
3318         )
3319
3320     def __getitem__(self, key):
3321         if key.__class__ == int:
3322             bit_len, octets = self._value
3323             if key >= bit_len:
3324                 return False
3325             return (
3326                 byte2int(memoryview(octets)[key // 8:]) >>
3327                 (7 - (key % 8))
3328             ) & 1 == 1
3329         if isinstance(key, string_types):
3330             value = self.specs.get(key)
3331             if value is None:
3332                 raise ObjUnknown("BitString value: %s" % key)
3333             return self[value]
3334         raise InvalidValueType((int, str))
3335
3336     def _encode(self):
3337         self._assert_ready()
3338         bit_len, octets = self._value
3339         return b"".join((
3340             self.tag,
3341             len_encode(len(octets) + 1),
3342             int2byte((8 - bit_len % 8) % 8),
3343             octets,
3344         ))
3345
3346     def _encode1st(self, state):
3347         self._assert_ready()
3348         _, octets = self._value
3349         l = len(octets) + 1
3350         return len(self.tag) + len_size(l) + l, state
3351
3352     def _encode2nd(self, writer, state_iter):
3353         bit_len, octets = self._value
3354         write_full(writer, b"".join((
3355             self.tag,
3356             len_encode(len(octets) + 1),
3357             int2byte((8 - bit_len % 8) % 8),
3358         )))
3359         write_full(writer, octets)
3360
3361     def _encode_cer(self, writer):
3362         bit_len, octets = self._value
3363         if len(octets) + 1 <= 1000:
3364             write_full(writer, self._encode())
3365             return
3366         write_full(writer, self.tag_constructed)
3367         write_full(writer, LENINDEF)
3368         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3369             write_full(writer, b"".join((
3370                 BitString.tag_default,
3371                 LEN1K,
3372                 int2byte(0),
3373                 octets[offset:offset + 999],
3374             )))
3375         tail = octets[offset+999:]
3376         if len(tail) > 0:
3377             tail = int2byte((8 - bit_len % 8) % 8) + tail
3378             write_full(writer, b"".join((
3379                 BitString.tag_default,
3380                 len_encode(len(tail)),
3381                 tail,
3382             )))
3383         write_full(writer, EOC)
3384
3385     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3386         try:
3387             t, tlen, lv = tag_strip(tlv)
3388         except DecodeError as err:
3389             raise err.__class__(
3390                 msg=err.msg,
3391                 klass=self.__class__,
3392                 decode_path=decode_path,
3393                 offset=offset,
3394             )
3395         if t == self.tag:
3396             if tag_only:  # pragma: no cover
3397                 yield None
3398                 return
3399             try:
3400                 l, llen, v = len_decode(lv)
3401             except DecodeError as err:
3402                 raise err.__class__(
3403                     msg=err.msg,
3404                     klass=self.__class__,
3405                     decode_path=decode_path,
3406                     offset=offset,
3407                 )
3408             if l > len(v):
3409                 raise NotEnoughData(
3410                     "encoded length is longer than data",
3411                     klass=self.__class__,
3412                     decode_path=decode_path,
3413                     offset=offset,
3414                 )
3415             if l == 0:
3416                 raise NotEnoughData(
3417                     "zero length",
3418                     klass=self.__class__,
3419                     decode_path=decode_path,
3420                     offset=offset,
3421                 )
3422             pad_size = byte2int(v)
3423             if l == 1 and pad_size != 0:
3424                 raise DecodeError(
3425                     "invalid empty value",
3426                     klass=self.__class__,
3427                     decode_path=decode_path,
3428                     offset=offset,
3429                 )
3430             if pad_size > 7:
3431                 raise DecodeError(
3432                     "too big pad",
3433                     klass=self.__class__,
3434                     decode_path=decode_path,
3435                     offset=offset,
3436                 )
3437             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3438                 raise DecodeError(
3439                     "invalid pad",
3440                     klass=self.__class__,
3441                     decode_path=decode_path,
3442                     offset=offset,
3443                 )
3444             v, tail = v[:l], v[l:]
3445             bit_len = (len(v) - 1) * 8 - pad_size
3446             obj = self.__class__(
3447                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3448                 impl=self.tag,
3449                 expl=self._expl,
3450                 default=self.default,
3451                 optional=self.optional,
3452                 _specs=self.specs,
3453                 _decoded=(offset, llen, l),
3454             )
3455             if evgen_mode:
3456                 obj._value = (bit_len, None)
3457             yield decode_path, obj, tail
3458             return
3459         if t != self.tag_constructed:
3460             raise TagMismatch(
3461                 klass=self.__class__,
3462                 decode_path=decode_path,
3463                 offset=offset,
3464             )
3465         if not ctx.get("bered", False):
3466             raise DecodeError(
3467                 "unallowed BER constructed encoding",
3468                 klass=self.__class__,
3469                 decode_path=decode_path,
3470                 offset=offset,
3471             )
3472         if tag_only:  # pragma: no cover
3473             yield None
3474             return
3475         lenindef = False
3476         try:
3477             l, llen, v = len_decode(lv)
3478         except LenIndefForm:
3479             llen, l, v = 1, 0, lv[1:]
3480             lenindef = True
3481         except DecodeError as err:
3482             raise err.__class__(
3483                 msg=err.msg,
3484                 klass=self.__class__,
3485                 decode_path=decode_path,
3486                 offset=offset,
3487             )
3488         if l > len(v):
3489             raise NotEnoughData(
3490                 "encoded length is longer than data",
3491                 klass=self.__class__,
3492                 decode_path=decode_path,
3493                 offset=offset,
3494             )
3495         if not lenindef and l == 0:
3496             raise NotEnoughData(
3497                 "zero length",
3498                 klass=self.__class__,
3499                 decode_path=decode_path,
3500                 offset=offset,
3501             )
3502         chunks = []
3503         sub_offset = offset + tlen + llen
3504         vlen = 0
3505         while True:
3506             if lenindef:
3507                 if v[:EOC_LEN].tobytes() == EOC:
3508                     break
3509             else:
3510                 if vlen == l:
3511                     break
3512                 if vlen > l:
3513                     raise DecodeError(
3514                         "chunk out of bounds",
3515                         klass=self.__class__,
3516                         decode_path=decode_path + (str(len(chunks) - 1),),
3517                         offset=chunks[-1].offset,
3518                     )
3519             sub_decode_path = decode_path + (str(len(chunks)),)
3520             try:
3521                 if evgen_mode:
3522                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3523                             v,
3524                             offset=sub_offset,
3525                             decode_path=sub_decode_path,
3526                             leavemm=True,
3527                             ctx=ctx,
3528                             _ctx_immutable=False,
3529                     ):
3530                         yield _decode_path, chunk, v_tail
3531                 else:
3532                     _, chunk, v_tail = next(BitString().decode_evgen(
3533                         v,
3534                         offset=sub_offset,
3535                         decode_path=sub_decode_path,
3536                         leavemm=True,
3537                         ctx=ctx,
3538                         _ctx_immutable=False,
3539                         _evgen_mode=False,
3540                     ))
3541             except TagMismatch:
3542                 raise DecodeError(
3543                     "expected BitString encoded chunk",
3544                     klass=self.__class__,
3545                     decode_path=sub_decode_path,
3546                     offset=sub_offset,
3547                 )
3548             chunks.append(chunk)
3549             sub_offset += chunk.tlvlen
3550             vlen += chunk.tlvlen
3551             v = v_tail
3552         if len(chunks) == 0:
3553             raise DecodeError(
3554                 "no chunks",
3555                 klass=self.__class__,
3556                 decode_path=decode_path,
3557                 offset=offset,
3558             )
3559         values = []
3560         bit_len = 0
3561         for chunk_i, chunk in enumerate(chunks[:-1]):
3562             if chunk.bit_len % 8 != 0:
3563                 raise DecodeError(
3564                     "BitString chunk is not multiple of 8 bits",
3565                     klass=self.__class__,
3566                     decode_path=decode_path + (str(chunk_i),),
3567                     offset=chunk.offset,
3568                 )
3569             if not evgen_mode:
3570                 values.append(bytes(chunk))
3571             bit_len += chunk.bit_len
3572         chunk_last = chunks[-1]
3573         if not evgen_mode:
3574             values.append(bytes(chunk_last))
3575         bit_len += chunk_last.bit_len
3576         obj = self.__class__(
3577             value=None if evgen_mode else (bit_len, b"".join(values)),
3578             impl=self.tag,
3579             expl=self._expl,
3580             default=self.default,
3581             optional=self.optional,
3582             _specs=self.specs,
3583             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3584         )
3585         if evgen_mode:
3586             obj._value = (bit_len, None)
3587         obj.lenindef = lenindef
3588         obj.ber_encoded = True
3589         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3590
3591     def __repr__(self):
3592         return pp_console_row(next(self.pps()))
3593
3594     def pps(self, decode_path=()):
3595         value = None
3596         blob = None
3597         if self.ready:
3598             bit_len, blob = self._value
3599             value = "%d bits" % bit_len
3600             if len(self.specs) > 0 and blob is not None:
3601                 blob = tuple(self.named)
3602         yield _pp(
3603             obj=self,
3604             asn1_type_name=self.asn1_type_name,
3605             obj_name=self.__class__.__name__,
3606             decode_path=decode_path,
3607             value=value,
3608             blob=blob,
3609             optional=self.optional,
3610             default=self == self.default,
3611             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3612             expl=None if self._expl is None else tag_decode(self._expl),
3613             offset=self.offset,
3614             tlen=self.tlen,
3615             llen=self.llen,
3616             vlen=self.vlen,
3617             expl_offset=self.expl_offset if self.expled else None,
3618             expl_tlen=self.expl_tlen if self.expled else None,
3619             expl_llen=self.expl_llen if self.expled else None,
3620             expl_vlen=self.expl_vlen if self.expled else None,
3621             expl_lenindef=self.expl_lenindef,
3622             lenindef=self.lenindef,
3623             ber_encoded=self.ber_encoded,
3624             bered=self.bered,
3625         )
3626         defined_by, defined = self.defined or (None, None)
3627         if defined_by is not None:
3628             yield defined.pps(
3629                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3630             )
3631         for pp in self.pps_lenindef(decode_path):
3632             yield pp
3633
3634
3635 OctetStringState = namedtuple(
3636     "OctetStringState",
3637     BasicState._fields + (
3638         "value",
3639         "bound_min",
3640         "bound_max",
3641         "tag_constructed",
3642         "defined",
3643     ),
3644     **NAMEDTUPLE_KWARGS
3645 )
3646
3647
3648 class OctetString(Obj):
3649     """``OCTET STRING`` binary string type
3650
3651     >>> s = OctetString(b"hello world")
3652     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3653     >>> s == OctetString(b"hello world")
3654     True
3655     >>> bytes(s)
3656     b'hello world'
3657
3658     >>> OctetString(b"hello", bounds=(4, 4))
3659     Traceback (most recent call last):
3660     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3661     >>> OctetString(b"hell", bounds=(4, 4))
3662     OCTET STRING 4 bytes 68656c6c
3663
3664     Memoryviews can be used as a values. If memoryview is made on
3665     mmap-ed file, then it does not take storage inside OctetString
3666     itself. In CER encoding mode it will be streamed to the specified
3667     writer, copying 1 KB chunks.
3668     """
3669     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3670     tag_default = tag_encode(4)
3671     asn1_type_name = "OCTET STRING"
3672     evgen_mode_skip_value = True
3673
3674     def __init__(
3675             self,
3676             value=None,
3677             bounds=None,
3678             impl=None,
3679             expl=None,
3680             default=None,
3681             optional=False,
3682             _decoded=(0, 0, 0),
3683             ctx=None,
3684     ):
3685         """
3686         :param value: set the value. Either binary type, or
3687                       :py:class:`pyderasn.OctetString` object
3688         :param bounds: set ``(MIN, MAX)`` value size constraint.
3689                        (-inf, +inf) by default
3690         :param bytes impl: override default tag with ``IMPLICIT`` one
3691         :param bytes expl: override default tag with ``EXPLICIT`` one
3692         :param default: set default value. Type same as in ``value``
3693         :param bool optional: is object ``OPTIONAL`` in sequence
3694         """
3695         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3696         self._value = value
3697         self._bound_min, self._bound_max = getattr(
3698             self,
3699             "bounds",
3700             (0, float("+inf")),
3701         ) if bounds is None else bounds
3702         if value is not None:
3703             self._value = self._value_sanitize(value)
3704         if default is not None:
3705             default = self._value_sanitize(default)
3706             self.default = self.__class__(
3707                 value=default,
3708                 impl=self.tag,
3709                 expl=self._expl,
3710             )
3711             if self._value is None:
3712                 self._value = default
3713         self.defined = None
3714         tag_klass, _, tag_num = tag_decode(self.tag)
3715         self.tag_constructed = tag_encode(
3716             klass=tag_klass,
3717             form=TagFormConstructed,
3718             num=tag_num,
3719         )
3720
3721     def _value_sanitize(self, value):
3722         if value.__class__ == binary_type or value.__class__ == memoryview:
3723             pass
3724         elif issubclass(value.__class__, OctetString):
3725             value = value._value
3726         else:
3727             raise InvalidValueType((self.__class__, bytes, memoryview))
3728         if not self._bound_min <= len(value) <= self._bound_max:
3729             raise BoundsError(self._bound_min, len(value), self._bound_max)
3730         return value
3731
3732     @property
3733     def ready(self):
3734         return self._value is not None
3735
3736     def __getstate__(self):
3737         return OctetStringState(
3738             __version__,
3739             self.tag,
3740             self._tag_order,
3741             self._expl,
3742             self.default,
3743             self.optional,
3744             self.offset,
3745             self.llen,
3746             self.vlen,
3747             self.expl_lenindef,
3748             self.lenindef,
3749             self.ber_encoded,
3750             self._value,
3751             self._bound_min,
3752             self._bound_max,
3753             self.tag_constructed,
3754             self.defined,
3755         )
3756
3757     def __setstate__(self, state):
3758         super(OctetString, self).__setstate__(state)
3759         self._value = state.value
3760         self._bound_min = state.bound_min
3761         self._bound_max = state.bound_max
3762         self.tag_constructed = state.tag_constructed
3763         self.defined = state.defined
3764
3765     def __bytes__(self):
3766         self._assert_ready()
3767         return bytes(self._value)
3768
3769     def __eq__(self, their):
3770         if their.__class__ == binary_type:
3771             return self._value == their
3772         if not issubclass(their.__class__, OctetString):
3773             return False
3774         return (
3775             self._value == their._value and
3776             self.tag == their.tag and
3777             self._expl == their._expl
3778         )
3779
3780     def __lt__(self, their):
3781         return self._value < their._value
3782
3783     def __call__(
3784             self,
3785             value=None,
3786             bounds=None,
3787             impl=None,
3788             expl=None,
3789             default=None,
3790             optional=None,
3791     ):
3792         return self.__class__(
3793             value=value,
3794             bounds=(
3795                 (self._bound_min, self._bound_max)
3796                 if bounds is None else bounds
3797             ),
3798             impl=self.tag if impl is None else impl,
3799             expl=self._expl if expl is None else expl,
3800             default=self.default if default is None else default,
3801             optional=self.optional if optional is None else optional,
3802         )
3803
3804     def _encode(self):
3805         self._assert_ready()
3806         return b"".join((
3807             self.tag,
3808             len_encode(len(self._value)),
3809             self._value,
3810         ))
3811
3812     def _encode1st(self, state):
3813         self._assert_ready()
3814         l = len(self._value)
3815         return len(self.tag) + len_size(l) + l, state
3816
3817     def _encode2nd(self, writer, state_iter):
3818         value = self._value
3819         write_full(writer, self.tag + len_encode(len(value)))
3820         write_full(writer, value)
3821
3822     def _encode_cer(self, writer):
3823         octets = self._value
3824         if len(octets) <= 1000:
3825             write_full(writer, self._encode())
3826             return
3827         write_full(writer, self.tag_constructed)
3828         write_full(writer, LENINDEF)
3829         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3830             write_full(writer, b"".join((
3831                 OctetString.tag_default,
3832                 LEN1K,
3833                 octets[offset:offset + 1000],
3834             )))
3835         tail = octets[offset+1000:]
3836         if len(tail) > 0:
3837             write_full(writer, b"".join((
3838                 OctetString.tag_default,
3839                 len_encode(len(tail)),
3840                 tail,
3841             )))
3842         write_full(writer, EOC)
3843
3844     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3845         try:
3846             t, tlen, lv = tag_strip(tlv)
3847         except DecodeError as err:
3848             raise err.__class__(
3849                 msg=err.msg,
3850                 klass=self.__class__,
3851                 decode_path=decode_path,
3852                 offset=offset,
3853             )
3854         if t == self.tag:
3855             if tag_only:
3856                 yield None
3857                 return
3858             try:
3859                 l, llen, v = len_decode(lv)
3860             except DecodeError as err:
3861                 raise err.__class__(
3862                     msg=err.msg,
3863                     klass=self.__class__,
3864                     decode_path=decode_path,
3865                     offset=offset,
3866                 )
3867             if l > len(v):
3868                 raise NotEnoughData(
3869                     "encoded length is longer than data",
3870                     klass=self.__class__,
3871                     decode_path=decode_path,
3872                     offset=offset,
3873                 )
3874             v, tail = v[:l], v[l:]
3875             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3876                 raise DecodeError(
3877                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3878                     klass=self.__class__,
3879                     decode_path=decode_path,
3880                     offset=offset,
3881                 )
3882             try:
3883                 obj = self.__class__(
3884                     value=(
3885                         None if (evgen_mode and self.evgen_mode_skip_value)
3886                         else v.tobytes()
3887                     ),
3888                     bounds=(self._bound_min, self._bound_max),
3889                     impl=self.tag,
3890                     expl=self._expl,
3891                     default=self.default,
3892                     optional=self.optional,
3893                     _decoded=(offset, llen, l),
3894                     ctx=ctx,
3895                 )
3896             except DecodeError as err:
3897                 raise DecodeError(
3898                     msg=err.msg,
3899                     klass=self.__class__,
3900                     decode_path=decode_path,
3901                     offset=offset,
3902                 )
3903             except BoundsError as err:
3904                 raise DecodeError(
3905                     msg=str(err),
3906                     klass=self.__class__,
3907                     decode_path=decode_path,
3908                     offset=offset,
3909                 )
3910             yield decode_path, obj, tail
3911             return
3912         if t != self.tag_constructed:
3913             raise TagMismatch(
3914                 klass=self.__class__,
3915                 decode_path=decode_path,
3916                 offset=offset,
3917             )
3918         if not ctx.get("bered", False):
3919             raise DecodeError(
3920                 "unallowed BER constructed encoding",
3921                 klass=self.__class__,
3922                 decode_path=decode_path,
3923                 offset=offset,
3924             )
3925         if tag_only:
3926             yield None
3927             return
3928         lenindef = False
3929         try:
3930             l, llen, v = len_decode(lv)
3931         except LenIndefForm:
3932             llen, l, v = 1, 0, lv[1:]
3933             lenindef = True
3934         except DecodeError as err:
3935             raise err.__class__(
3936                 msg=err.msg,
3937                 klass=self.__class__,
3938                 decode_path=decode_path,
3939                 offset=offset,
3940             )
3941         if l > len(v):
3942             raise NotEnoughData(
3943                 "encoded length is longer than data",
3944                 klass=self.__class__,
3945                 decode_path=decode_path,
3946                 offset=offset,
3947             )
3948         chunks = []
3949         chunks_count = 0
3950         sub_offset = offset + tlen + llen
3951         vlen = 0
3952         payload_len = 0
3953         while True:
3954             if lenindef:
3955                 if v[:EOC_LEN].tobytes() == EOC:
3956                     break
3957             else:
3958                 if vlen == l:
3959                     break
3960                 if vlen > l:
3961                     raise DecodeError(
3962                         "chunk out of bounds",
3963                         klass=self.__class__,
3964                         decode_path=decode_path + (str(len(chunks) - 1),),
3965                         offset=chunks[-1].offset,
3966                     )
3967             try:
3968                 if evgen_mode:
3969                     sub_decode_path = decode_path + (str(chunks_count),)
3970                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3971                             v,
3972                             offset=sub_offset,
3973                             decode_path=sub_decode_path,
3974                             leavemm=True,
3975                             ctx=ctx,
3976                             _ctx_immutable=False,
3977                     ):
3978                         yield _decode_path, chunk, v_tail
3979                         if not chunk.ber_encoded:
3980                             payload_len += chunk.vlen
3981                     chunks_count += 1
3982                 else:
3983                     sub_decode_path = decode_path + (str(len(chunks)),)
3984                     _, chunk, v_tail = next(OctetString().decode_evgen(
3985                         v,
3986                         offset=sub_offset,
3987                         decode_path=sub_decode_path,
3988                         leavemm=True,
3989                         ctx=ctx,
3990                         _ctx_immutable=False,
3991                         _evgen_mode=False,
3992                     ))
3993                     chunks.append(chunk)
3994             except TagMismatch:
3995                 raise DecodeError(
3996                     "expected OctetString encoded chunk",
3997                     klass=self.__class__,
3998                     decode_path=sub_decode_path,
3999                     offset=sub_offset,
4000                 )
4001             sub_offset += chunk.tlvlen
4002             vlen += chunk.tlvlen
4003             v = v_tail
4004         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4005             raise DecodeError(
4006                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4007                 klass=self.__class__,
4008                 decode_path=decode_path,
4009                 offset=offset,
4010             )
4011         try:
4012             obj = self.__class__(
4013                 value=(
4014                     None if evgen_mode else
4015                     b"".join(bytes(chunk) for chunk in chunks)
4016                 ),
4017                 bounds=(self._bound_min, self._bound_max),
4018                 impl=self.tag,
4019                 expl=self._expl,
4020                 default=self.default,
4021                 optional=self.optional,
4022                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4023                 ctx=ctx,
4024             )
4025         except DecodeError as err:
4026             raise DecodeError(
4027                 msg=err.msg,
4028                 klass=self.__class__,
4029                 decode_path=decode_path,
4030                 offset=offset,
4031             )
4032         except BoundsError as err:
4033             raise DecodeError(
4034                 msg=str(err),
4035                 klass=self.__class__,
4036                 decode_path=decode_path,
4037                 offset=offset,
4038             )
4039         obj.lenindef = lenindef
4040         obj.ber_encoded = True
4041         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4042
4043     def __repr__(self):
4044         return pp_console_row(next(self.pps()))
4045
4046     def pps(self, decode_path=()):
4047         yield _pp(
4048             obj=self,
4049             asn1_type_name=self.asn1_type_name,
4050             obj_name=self.__class__.__name__,
4051             decode_path=decode_path,
4052             value=("%d bytes" % len(self._value)) if self.ready else None,
4053             blob=self._value if self.ready else None,
4054             optional=self.optional,
4055             default=self == self.default,
4056             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4057             expl=None if self._expl is None else tag_decode(self._expl),
4058             offset=self.offset,
4059             tlen=self.tlen,
4060             llen=self.llen,
4061             vlen=self.vlen,
4062             expl_offset=self.expl_offset if self.expled else None,
4063             expl_tlen=self.expl_tlen if self.expled else None,
4064             expl_llen=self.expl_llen if self.expled else None,
4065             expl_vlen=self.expl_vlen if self.expled else None,
4066             expl_lenindef=self.expl_lenindef,
4067             lenindef=self.lenindef,
4068             ber_encoded=self.ber_encoded,
4069             bered=self.bered,
4070         )
4071         defined_by, defined = self.defined or (None, None)
4072         if defined_by is not None:
4073             yield defined.pps(
4074                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4075             )
4076         for pp in self.pps_lenindef(decode_path):
4077             yield pp
4078
4079
4080 def agg_octet_string(evgens, decode_path, raw, writer):
4081     """Aggregate constructed string (OctetString and its derivatives)
4082
4083     :param evgens: iterator of generated events
4084     :param decode_path: points to the string we want to decode
4085     :param raw: slicebable (memoryview, bytearray, etc) with
4086                 the data evgens are generated on
4087     :param writer: buffer.write where string is going to be saved
4088     :param writer: where string is going to be saved. Must comply
4089                    with ``io.RawIOBase.write`` behaviour
4090
4091     .. seealso:: :ref:`agg_octet_string`
4092     """
4093     decode_path_len = len(decode_path)
4094     for dp, obj, _ in evgens:
4095         if dp[:decode_path_len] != decode_path:
4096             continue
4097         if not obj.ber_encoded:
4098             write_full(writer, raw[
4099                 obj.offset + obj.tlen + obj.llen:
4100                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4101                 (EOC_LEN if obj.expl_lenindef else 0)
4102             ])
4103         if len(dp) == decode_path_len:
4104             break
4105
4106
4107 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4108
4109
4110 class Null(Obj):
4111     """``NULL`` null object
4112
4113     >>> n = Null()
4114     NULL
4115     >>> n.ready
4116     True
4117     """
4118     __slots__ = ()
4119     tag_default = tag_encode(5)
4120     asn1_type_name = "NULL"
4121
4122     def __init__(
4123             self,
4124             value=None,  # unused, but Sequence passes it
4125             impl=None,
4126             expl=None,
4127             optional=False,
4128             _decoded=(0, 0, 0),
4129     ):
4130         """
4131         :param bytes impl: override default tag with ``IMPLICIT`` one
4132         :param bytes expl: override default tag with ``EXPLICIT`` one
4133         :param bool optional: is object ``OPTIONAL`` in sequence
4134         """
4135         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4136         self.default = None
4137
4138     @property
4139     def ready(self):
4140         return True
4141
4142     def __getstate__(self):
4143         return NullState(
4144             __version__,
4145             self.tag,
4146             self._tag_order,
4147             self._expl,
4148             self.default,
4149             self.optional,
4150             self.offset,
4151             self.llen,
4152             self.vlen,
4153             self.expl_lenindef,
4154             self.lenindef,
4155             self.ber_encoded,
4156         )
4157
4158     def __eq__(self, their):
4159         if not issubclass(their.__class__, Null):
4160             return False
4161         return (
4162             self.tag == their.tag and
4163             self._expl == their._expl
4164         )
4165
4166     def __call__(
4167             self,
4168             value=None,
4169             impl=None,
4170             expl=None,
4171             optional=None,
4172     ):
4173         return self.__class__(
4174             impl=self.tag if impl is None else impl,
4175             expl=self._expl if expl is None else expl,
4176             optional=self.optional if optional is None else optional,
4177         )
4178
4179     def _encode(self):
4180         return self.tag + LEN0
4181
4182     def _encode1st(self, state):
4183         return len(self.tag) + 1, state
4184
4185     def _encode2nd(self, writer, state_iter):
4186         write_full(writer, self.tag + LEN0)
4187
4188     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4189         try:
4190             t, _, lv = tag_strip(tlv)
4191         except DecodeError as err:
4192             raise err.__class__(
4193                 msg=err.msg,
4194                 klass=self.__class__,
4195                 decode_path=decode_path,
4196                 offset=offset,
4197             )
4198         if t != self.tag:
4199             raise TagMismatch(
4200                 klass=self.__class__,
4201                 decode_path=decode_path,
4202                 offset=offset,
4203             )
4204         if tag_only:  # pragma: no cover
4205             yield None
4206             return
4207         try:
4208             l, _, v = len_decode(lv)
4209         except DecodeError as err:
4210             raise err.__class__(
4211                 msg=err.msg,
4212                 klass=self.__class__,
4213                 decode_path=decode_path,
4214                 offset=offset,
4215             )
4216         if l != 0:
4217             raise InvalidLength(
4218                 "Null must have zero length",
4219                 klass=self.__class__,
4220                 decode_path=decode_path,
4221                 offset=offset,
4222             )
4223         obj = self.__class__(
4224             impl=self.tag,
4225             expl=self._expl,
4226             optional=self.optional,
4227             _decoded=(offset, 1, 0),
4228         )
4229         yield decode_path, obj, v
4230
4231     def __repr__(self):
4232         return pp_console_row(next(self.pps()))
4233
4234     def pps(self, decode_path=()):
4235         yield _pp(
4236             obj=self,
4237             asn1_type_name=self.asn1_type_name,
4238             obj_name=self.__class__.__name__,
4239             decode_path=decode_path,
4240             optional=self.optional,
4241             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4242             expl=None if self._expl is None else tag_decode(self._expl),
4243             offset=self.offset,
4244             tlen=self.tlen,
4245             llen=self.llen,
4246             vlen=self.vlen,
4247             expl_offset=self.expl_offset if self.expled else None,
4248             expl_tlen=self.expl_tlen if self.expled else None,
4249             expl_llen=self.expl_llen if self.expled else None,
4250             expl_vlen=self.expl_vlen if self.expled else None,
4251             expl_lenindef=self.expl_lenindef,
4252             bered=self.bered,
4253         )
4254         for pp in self.pps_lenindef(decode_path):
4255             yield pp
4256
4257
4258 ObjectIdentifierState = namedtuple(
4259     "ObjectIdentifierState",
4260     BasicState._fields + ("value", "defines"),
4261     **NAMEDTUPLE_KWARGS
4262 )
4263
4264
4265 class ObjectIdentifier(Obj):
4266     """``OBJECT IDENTIFIER`` OID type
4267
4268     >>> oid = ObjectIdentifier((1, 2, 3))
4269     OBJECT IDENTIFIER 1.2.3
4270     >>> oid == ObjectIdentifier("1.2.3")
4271     True
4272     >>> tuple(oid)
4273     (1, 2, 3)
4274     >>> str(oid)
4275     '1.2.3'
4276     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4277     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4278
4279     >>> str(ObjectIdentifier((3, 1)))
4280     Traceback (most recent call last):
4281     pyderasn.InvalidOID: unacceptable first arc value
4282     """
4283     __slots__ = ("defines",)
4284     tag_default = tag_encode(6)
4285     asn1_type_name = "OBJECT IDENTIFIER"
4286
4287     def __init__(
4288             self,
4289             value=None,
4290             defines=(),
4291             impl=None,
4292             expl=None,
4293             default=None,
4294             optional=False,
4295             _decoded=(0, 0, 0),
4296     ):
4297         """
4298         :param value: set the value. Either tuples of integers,
4299                       string of "."-concatenated integers, or
4300                       :py:class:`pyderasn.ObjectIdentifier` object
4301         :param defines: sequence of tuples. Each tuple has two elements.
4302                         First one is relative to current one decode
4303                         path, aiming to the field defined by that OID.
4304                         Read about relative path in
4305                         :py:func:`pyderasn.abs_decode_path`. Second
4306                         tuple element is ``{OID: pyderasn.Obj()}``
4307                         dictionary, mapping between current OID value
4308                         and structure applied to defined field.
4309
4310                         .. seealso:: :ref:`definedby`
4311
4312         :param bytes impl: override default tag with ``IMPLICIT`` one
4313         :param bytes expl: override default tag with ``EXPLICIT`` one
4314         :param default: set default value. Type same as in ``value``
4315         :param bool optional: is object ``OPTIONAL`` in sequence
4316         """
4317         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4318         self._value = value
4319         if value is not None:
4320             self._value = self._value_sanitize(value)
4321         if default is not None:
4322             default = self._value_sanitize(default)
4323             self.default = self.__class__(
4324                 value=default,
4325                 impl=self.tag,
4326                 expl=self._expl,
4327             )
4328             if self._value is None:
4329                 self._value = default
4330         self.defines = defines
4331
4332     def __add__(self, their):
4333         if their.__class__ == tuple:
4334             return self.__class__(self._value + array("L", their))
4335         if isinstance(their, self.__class__):
4336             return self.__class__(self._value + their._value)
4337         raise InvalidValueType((self.__class__, tuple))
4338
4339     def _value_sanitize(self, value):
4340         if issubclass(value.__class__, ObjectIdentifier):
4341             return value._value
4342         if isinstance(value, string_types):
4343             try:
4344                 value = array("L", (pureint(arc) for arc in value.split(".")))
4345             except ValueError:
4346                 raise InvalidOID("unacceptable arcs values")
4347         if value.__class__ == tuple:
4348             try:
4349                 value = array("L", value)
4350             except OverflowError as err:
4351                 raise InvalidOID(repr(err))
4352         if value.__class__ is array:
4353             if len(value) < 2:
4354                 raise InvalidOID("less than 2 arcs")
4355             first_arc = value[0]
4356             if first_arc in (0, 1):
4357                 if not (0 <= value[1] <= 39):
4358                     raise InvalidOID("second arc is too wide")
4359             elif first_arc == 2:
4360                 pass
4361             else:
4362                 raise InvalidOID("unacceptable first arc value")
4363             if not all(arc >= 0 for arc in value):
4364                 raise InvalidOID("negative arc value")
4365             return value
4366         raise InvalidValueType((self.__class__, str, tuple))
4367
4368     @property
4369     def ready(self):
4370         return self._value is not None
4371
4372     def __getstate__(self):
4373         return ObjectIdentifierState(
4374             __version__,
4375             self.tag,
4376             self._tag_order,
4377             self._expl,
4378             self.default,
4379             self.optional,
4380             self.offset,
4381             self.llen,
4382             self.vlen,
4383             self.expl_lenindef,
4384             self.lenindef,
4385             self.ber_encoded,
4386             self._value,
4387             self.defines,
4388         )
4389
4390     def __setstate__(self, state):
4391         super(ObjectIdentifier, self).__setstate__(state)
4392         self._value = state.value
4393         self.defines = state.defines
4394
4395     def __iter__(self):
4396         self._assert_ready()
4397         return iter(self._value)
4398
4399     def __str__(self):
4400         return ".".join(str(arc) for arc in self._value or ())
4401
4402     def __hash__(self):
4403         self._assert_ready()
4404         return hash(b"".join((
4405             self.tag,
4406             bytes(self._expl or b""),
4407             str(self._value).encode("ascii"),
4408         )))
4409
4410     def __eq__(self, their):
4411         if their.__class__ == tuple:
4412             return self._value == array("L", their)
4413         if not issubclass(their.__class__, ObjectIdentifier):
4414             return False
4415         return (
4416             self.tag == their.tag and
4417             self._expl == their._expl and
4418             self._value == their._value
4419         )
4420
4421     def __lt__(self, their):
4422         return self._value < their._value
4423
4424     def __call__(
4425             self,
4426             value=None,
4427             defines=None,
4428             impl=None,
4429             expl=None,
4430             default=None,
4431             optional=None,
4432     ):
4433         return self.__class__(
4434             value=value,
4435             defines=self.defines if defines is None else defines,
4436             impl=self.tag if impl is None else impl,
4437             expl=self._expl if expl is None else expl,
4438             default=self.default if default is None else default,
4439             optional=self.optional if optional is None else optional,
4440         )
4441
4442     def _encode_octets(self):
4443         self._assert_ready()
4444         value = self._value
4445         first_value = value[1]
4446         first_arc = value[0]
4447         if first_arc == 0:
4448             pass
4449         elif first_arc == 1:
4450             first_value += 40
4451         elif first_arc == 2:
4452             first_value += 80
4453         else:  # pragma: no cover
4454             raise RuntimeError("invalid arc is stored")
4455         octets = [zero_ended_encode(first_value)]
4456         for arc in value[2:]:
4457             octets.append(zero_ended_encode(arc))
4458         return b"".join(octets)
4459
4460     def _encode(self):
4461         v = self._encode_octets()
4462         return b"".join((self.tag, len_encode(len(v)), v))
4463
4464     def _encode1st(self, state):
4465         l = len(self._encode_octets())
4466         return len(self.tag) + len_size(l) + l, state
4467
4468     def _encode2nd(self, writer, state_iter):
4469         write_full(writer, self._encode())
4470
4471     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4472         try:
4473             t, _, lv = tag_strip(tlv)
4474         except DecodeError as err:
4475             raise err.__class__(
4476                 msg=err.msg,
4477                 klass=self.__class__,
4478                 decode_path=decode_path,
4479                 offset=offset,
4480             )
4481         if t != self.tag:
4482             raise TagMismatch(
4483                 klass=self.__class__,
4484                 decode_path=decode_path,
4485                 offset=offset,
4486             )
4487         if tag_only:  # pragma: no cover
4488             yield None
4489             return
4490         try:
4491             l, llen, v = len_decode(lv)
4492         except DecodeError as err:
4493             raise err.__class__(
4494                 msg=err.msg,
4495                 klass=self.__class__,
4496                 decode_path=decode_path,
4497                 offset=offset,
4498             )
4499         if l > len(v):
4500             raise NotEnoughData(
4501                 "encoded length is longer than data",
4502                 klass=self.__class__,
4503                 decode_path=decode_path,
4504                 offset=offset,
4505             )
4506         if l == 0:
4507             raise NotEnoughData(
4508                 "zero length",
4509                 klass=self.__class__,
4510                 decode_path=decode_path,
4511                 offset=offset,
4512             )
4513         v, tail = v[:l], v[l:]
4514         arcs = array("L")
4515         ber_encoded = False
4516         while len(v) > 0:
4517             i = 0
4518             arc = 0
4519             while True:
4520                 octet = indexbytes(v, i)
4521                 if i == 0 and octet == 0x80:
4522                     if ctx.get("bered", False):
4523                         ber_encoded = True
4524                     else:
4525                         raise DecodeError(
4526                             "non normalized arc encoding",
4527                             klass=self.__class__,
4528                             decode_path=decode_path,
4529                             offset=offset,
4530                         )
4531                 arc = (arc << 7) | (octet & 0x7F)
4532                 if octet & 0x80 == 0:
4533                     try:
4534                         arcs.append(arc)
4535                     except OverflowError:
4536                         raise DecodeError(
4537                             "too huge value for local unsigned long",
4538                             klass=self.__class__,
4539                             decode_path=decode_path,
4540                             offset=offset,
4541                         )
4542                     v = v[i + 1:]
4543                     break
4544                 i += 1
4545                 if i == len(v):
4546                     raise DecodeError(
4547                         "unfinished OID",
4548                         klass=self.__class__,
4549                         decode_path=decode_path,
4550                         offset=offset,
4551                     )
4552         first_arc = 0
4553         second_arc = arcs[0]
4554         if 0 <= second_arc <= 39:
4555             first_arc = 0
4556         elif 40 <= second_arc <= 79:
4557             first_arc = 1
4558             second_arc -= 40
4559         else:
4560             first_arc = 2
4561             second_arc -= 80
4562         obj = self.__class__(
4563             value=array("L", (first_arc, second_arc)) + arcs[1:],
4564             impl=self.tag,
4565             expl=self._expl,
4566             default=self.default,
4567             optional=self.optional,
4568             _decoded=(offset, llen, l),
4569         )
4570         if ber_encoded:
4571             obj.ber_encoded = True
4572         yield decode_path, obj, tail
4573
4574     def __repr__(self):
4575         return pp_console_row(next(self.pps()))
4576
4577     def pps(self, decode_path=()):
4578         yield _pp(
4579             obj=self,
4580             asn1_type_name=self.asn1_type_name,
4581             obj_name=self.__class__.__name__,
4582             decode_path=decode_path,
4583             value=str(self) if self.ready else None,
4584             optional=self.optional,
4585             default=self == self.default,
4586             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4587             expl=None if self._expl is None else tag_decode(self._expl),
4588             offset=self.offset,
4589             tlen=self.tlen,
4590             llen=self.llen,
4591             vlen=self.vlen,
4592             expl_offset=self.expl_offset if self.expled else None,
4593             expl_tlen=self.expl_tlen if self.expled else None,
4594             expl_llen=self.expl_llen if self.expled else None,
4595             expl_vlen=self.expl_vlen if self.expled else None,
4596             expl_lenindef=self.expl_lenindef,
4597             ber_encoded=self.ber_encoded,
4598             bered=self.bered,
4599         )
4600         for pp in self.pps_lenindef(decode_path):
4601             yield pp
4602
4603
4604 class Enumerated(Integer):
4605     """``ENUMERATED`` integer type
4606
4607     This type is identical to :py:class:`pyderasn.Integer`, but requires
4608     schema to be specified and does not accept values missing from it.
4609     """
4610     __slots__ = ()
4611     tag_default = tag_encode(10)
4612     asn1_type_name = "ENUMERATED"
4613
4614     def __init__(
4615             self,
4616             value=None,
4617             impl=None,
4618             expl=None,
4619             default=None,
4620             optional=False,
4621             _specs=None,
4622             _decoded=(0, 0, 0),
4623             bounds=None,  # dummy argument, workability for Integer.decode
4624     ):
4625         super(Enumerated, self).__init__(
4626             value, bounds, impl, expl, default, optional, _specs, _decoded,
4627         )
4628         if len(self.specs) == 0:
4629             raise ValueError("schema must be specified")
4630
4631     def _value_sanitize(self, value):
4632         if isinstance(value, self.__class__):
4633             value = value._value
4634         elif isinstance(value, integer_types):
4635             for _value in itervalues(self.specs):
4636                 if _value == value:
4637                     break
4638             else:
4639                 raise DecodeError(
4640                     "unknown integer value: %s" % value,
4641                     klass=self.__class__,
4642                 )
4643         elif isinstance(value, string_types):
4644             value = self.specs.get(value)
4645             if value is None:
4646                 raise ObjUnknown("integer value: %s" % value)
4647         else:
4648             raise InvalidValueType((self.__class__, int, str))
4649         return value
4650
4651     def __call__(
4652             self,
4653             value=None,
4654             impl=None,
4655             expl=None,
4656             default=None,
4657             optional=None,
4658             _specs=None,
4659     ):
4660         return self.__class__(
4661             value=value,
4662             impl=self.tag if impl is None else impl,
4663             expl=self._expl if expl is None else expl,
4664             default=self.default if default is None else default,
4665             optional=self.optional if optional is None else optional,
4666             _specs=self.specs,
4667         )
4668
4669
4670 def escape_control_unicode(c):
4671     if unicat(c)[0] == "C":
4672         c = repr(c).lstrip("u").strip("'")
4673     return c
4674
4675
4676 class CommonString(OctetString):
4677     """Common class for all strings
4678
4679     Everything resembles :py:class:`pyderasn.OctetString`, except
4680     ability to deal with unicode text strings.
4681
4682     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4683     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4684     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4685     True
4686     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4687     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4688     >>> str(s)
4689     'привет Ð¼Ð¸Ñ€'
4690     >>> hexenc(bytes(s))
4691     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4692
4693     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4694     Traceback (most recent call last):
4695     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4696
4697     >>> BMPString("ада", bounds=(2, 2))
4698     Traceback (most recent call last):
4699     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4700     >>> s = BMPString("ад", bounds=(2, 2))
4701     >>> s.encoding
4702     'utf-16-be'
4703     >>> hexenc(bytes(s))
4704     '04300434'
4705
4706     .. list-table::
4707        :header-rows: 1
4708
4709        * - Class
4710          - Text Encoding
4711        * - :py:class:`pyderasn.UTF8String`
4712          - utf-8
4713        * - :py:class:`pyderasn.NumericString`
4714          - ascii
4715        * - :py:class:`pyderasn.PrintableString`
4716          - ascii
4717        * - :py:class:`pyderasn.TeletexString`
4718          - ascii
4719        * - :py:class:`pyderasn.T61String`
4720          - ascii
4721        * - :py:class:`pyderasn.VideotexString`
4722          - iso-8859-1
4723        * - :py:class:`pyderasn.IA5String`
4724          - ascii
4725        * - :py:class:`pyderasn.GraphicString`
4726          - iso-8859-1
4727        * - :py:class:`pyderasn.VisibleString`
4728          - ascii
4729        * - :py:class:`pyderasn.ISO646String`
4730          - ascii
4731        * - :py:class:`pyderasn.GeneralString`
4732          - iso-8859-1
4733        * - :py:class:`pyderasn.UniversalString`
4734          - utf-32-be
4735        * - :py:class:`pyderasn.BMPString`
4736          - utf-16-be
4737     """
4738     __slots__ = ()
4739
4740     def _value_sanitize(self, value):
4741         value_raw = None
4742         value_decoded = None
4743         if isinstance(value, self.__class__):
4744             value_raw = value._value
4745         elif value.__class__ == text_type:
4746             value_decoded = value
4747         elif value.__class__ == binary_type:
4748             value_raw = value
4749         else:
4750             raise InvalidValueType((self.__class__, text_type, binary_type))
4751         try:
4752             value_raw = (
4753                 value_decoded.encode(self.encoding)
4754                 if value_raw is None else value_raw
4755             )
4756             value_decoded = (
4757                 value_raw.decode(self.encoding)
4758                 if value_decoded is None else value_decoded
4759             )
4760         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4761             raise DecodeError(str(err))
4762         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4763             raise BoundsError(
4764                 self._bound_min,
4765                 len(value_decoded),
4766                 self._bound_max,
4767             )
4768         return value_raw
4769
4770     def __eq__(self, their):
4771         if their.__class__ == binary_type:
4772             return self._value == their
4773         if their.__class__ == text_type:
4774             return self._value == their.encode(self.encoding)
4775         if not isinstance(their, self.__class__):
4776             return False
4777         return (
4778             self._value == their._value and
4779             self.tag == their.tag and
4780             self._expl == their._expl
4781         )
4782
4783     def __unicode__(self):
4784         if self.ready:
4785             return self._value.decode(self.encoding)
4786         return text_type(self._value)
4787
4788     def __repr__(self):
4789         return pp_console_row(next(self.pps(no_unicode=PY2)))
4790
4791     def pps(self, decode_path=(), no_unicode=False):
4792         value = None
4793         if self.ready:
4794             value = (
4795                 hexenc(bytes(self)) if no_unicode else
4796                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4797             )
4798         yield _pp(
4799             obj=self,
4800             asn1_type_name=self.asn1_type_name,
4801             obj_name=self.__class__.__name__,
4802             decode_path=decode_path,
4803             value=value,
4804             optional=self.optional,
4805             default=self == self.default,
4806             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4807             expl=None if self._expl is None else tag_decode(self._expl),
4808             offset=self.offset,
4809             tlen=self.tlen,
4810             llen=self.llen,
4811             vlen=self.vlen,
4812             expl_offset=self.expl_offset if self.expled else None,
4813             expl_tlen=self.expl_tlen if self.expled else None,
4814             expl_llen=self.expl_llen if self.expled else None,
4815             expl_vlen=self.expl_vlen if self.expled else None,
4816             expl_lenindef=self.expl_lenindef,
4817             ber_encoded=self.ber_encoded,
4818             bered=self.bered,
4819         )
4820         for pp in self.pps_lenindef(decode_path):
4821             yield pp
4822
4823
4824 class UTF8String(CommonString):
4825     __slots__ = ()
4826     tag_default = tag_encode(12)
4827     encoding = "utf-8"
4828     asn1_type_name = "UTF8String"
4829
4830
4831 class AllowableCharsMixin(object):
4832     @property
4833     def allowable_chars(self):
4834         if PY2:
4835             return self._allowable_chars
4836         return frozenset(six_unichr(c) for c in self._allowable_chars)
4837
4838
4839 class NumericString(AllowableCharsMixin, CommonString):
4840     """Numeric string
4841
4842     Its value is properly sanitized: only ASCII digits with spaces can
4843     be stored.
4844
4845     >>> NumericString().allowable_chars
4846     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4847     """
4848     __slots__ = ()
4849     tag_default = tag_encode(18)
4850     encoding = "ascii"
4851     asn1_type_name = "NumericString"
4852     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4853
4854     def _value_sanitize(self, value):
4855         value = super(NumericString, self)._value_sanitize(value)
4856         if not frozenset(value) <= self._allowable_chars:
4857             raise DecodeError("non-numeric value")
4858         return value
4859
4860
4861 PrintableStringState = namedtuple(
4862     "PrintableStringState",
4863     OctetStringState._fields + ("allowable_chars",),
4864     **NAMEDTUPLE_KWARGS
4865 )
4866
4867
4868 class PrintableString(AllowableCharsMixin, CommonString):
4869     """Printable string
4870
4871     Its value is properly sanitized: see X.680 41.4 table 10.
4872
4873     >>> PrintableString().allowable_chars
4874     frozenset([' ', "'", ..., 'z'])
4875     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4876     PrintableString PrintableString foo*bar
4877     >>> obj.allow_asterisk, obj.allow_ampersand
4878     (True, False)
4879     """
4880     __slots__ = ()
4881     tag_default = tag_encode(19)
4882     encoding = "ascii"
4883     asn1_type_name = "PrintableString"
4884     _allowable_chars = frozenset(
4885         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4886     )
4887     _asterisk = frozenset("*".encode("ascii"))
4888     _ampersand = frozenset("&".encode("ascii"))
4889
4890     def __init__(
4891             self,
4892             value=None,
4893             bounds=None,
4894             impl=None,
4895             expl=None,
4896             default=None,
4897             optional=False,
4898             _decoded=(0, 0, 0),
4899             ctx=None,
4900             allow_asterisk=False,
4901             allow_ampersand=False,
4902     ):
4903         """
4904         :param allow_asterisk: allow asterisk character
4905         :param allow_ampersand: allow ampersand character
4906         """
4907         if allow_asterisk:
4908             self._allowable_chars |= self._asterisk
4909         if allow_ampersand:
4910             self._allowable_chars |= self._ampersand
4911         super(PrintableString, self).__init__(
4912             value, bounds, impl, expl, default, optional, _decoded, ctx,
4913         )
4914
4915     @property
4916     def allow_asterisk(self):
4917         """Is asterisk character allowed?
4918         """
4919         return self._asterisk <= self._allowable_chars
4920
4921     @property
4922     def allow_ampersand(self):
4923         """Is ampersand character allowed?
4924         """
4925         return self._ampersand <= self._allowable_chars
4926
4927     def _value_sanitize(self, value):
4928         value = super(PrintableString, self)._value_sanitize(value)
4929         if not frozenset(value) <= self._allowable_chars:
4930             raise DecodeError("non-printable value")
4931         return value
4932
4933     def __getstate__(self):
4934         return PrintableStringState(
4935             *super(PrintableString, self).__getstate__(),
4936             **{"allowable_chars": self._allowable_chars}
4937         )
4938
4939     def __setstate__(self, state):
4940         super(PrintableString, self).__setstate__(state)
4941         self._allowable_chars = state.allowable_chars
4942
4943     def __call__(
4944             self,
4945             value=None,
4946             bounds=None,
4947             impl=None,
4948             expl=None,
4949             default=None,
4950             optional=None,
4951     ):
4952         return self.__class__(
4953             value=value,
4954             bounds=(
4955                 (self._bound_min, self._bound_max)
4956                 if bounds is None else bounds
4957             ),
4958             impl=self.tag if impl is None else impl,
4959             expl=self._expl if expl is None else expl,
4960             default=self.default if default is None else default,
4961             optional=self.optional if optional is None else optional,
4962             allow_asterisk=self.allow_asterisk,
4963             allow_ampersand=self.allow_ampersand,
4964         )
4965
4966
4967 class TeletexString(CommonString):
4968     __slots__ = ()
4969     tag_default = tag_encode(20)
4970     encoding = "ascii"
4971     asn1_type_name = "TeletexString"
4972
4973
4974 class T61String(TeletexString):
4975     __slots__ = ()
4976     asn1_type_name = "T61String"
4977
4978
4979 class VideotexString(CommonString):
4980     __slots__ = ()
4981     tag_default = tag_encode(21)
4982     encoding = "iso-8859-1"
4983     asn1_type_name = "VideotexString"
4984
4985
4986 class IA5String(CommonString):
4987     __slots__ = ()
4988     tag_default = tag_encode(22)
4989     encoding = "ascii"
4990     asn1_type_name = "IA5"
4991
4992
4993 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4994 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4995 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4996 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4997 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4998 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4999
5000
5001 class VisibleString(CommonString):
5002     __slots__ = ()
5003     tag_default = tag_encode(26)
5004     encoding = "ascii"
5005     asn1_type_name = "VisibleString"
5006
5007
5008 UTCTimeState = namedtuple(
5009     "UTCTimeState",
5010     OctetStringState._fields + ("ber_raw",),
5011     **NAMEDTUPLE_KWARGS
5012 )
5013
5014
5015 def str_to_time_fractions(value):
5016     v = pureint(value)
5017     year, v = (v // 10**10), (v % 10**10)
5018     month, v = (v // 10**8), (v % 10**8)
5019     day, v = (v // 10**6), (v % 10**6)
5020     hour, v = (v // 10**4), (v % 10**4)
5021     minute, second = (v // 100), (v % 100)
5022     return year, month, day, hour, minute, second
5023
5024
5025 class UTCTime(VisibleString):
5026     """``UTCTime`` datetime type
5027
5028     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5029     UTCTime UTCTime 2017-09-30T22:07:50
5030     >>> str(t)
5031     '170930220750Z'
5032     >>> bytes(t)
5033     b'170930220750Z'
5034     >>> t.todatetime()
5035     datetime.datetime(2017, 9, 30, 22, 7, 50)
5036     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5037     datetime.datetime(1957, 9, 30, 22, 7, 50)
5038
5039     If BER encoded value was met, then ``ber_raw`` attribute will hold
5040     its raw representation.
5041
5042     .. warning::
5043
5044        Pay attention that UTCTime can not hold full year, so all years
5045        having < 50 years are treated as 20xx, 19xx otherwise, according
5046        to X.509 recommendation.
5047
5048     .. warning::
5049
5050        No strict validation of UTC offsets are made, but very crude:
5051
5052        * minutes are not exceeding 60
5053        * offset value is not exceeding 14 hours
5054     """
5055     __slots__ = ("ber_raw",)
5056     tag_default = tag_encode(23)
5057     encoding = "ascii"
5058     asn1_type_name = "UTCTime"
5059     evgen_mode_skip_value = False
5060
5061     def __init__(
5062             self,
5063             value=None,
5064             impl=None,
5065             expl=None,
5066             default=None,
5067             optional=False,
5068             _decoded=(0, 0, 0),
5069             bounds=None,  # dummy argument, workability for OctetString.decode
5070             ctx=None,
5071     ):
5072         """
5073         :param value: set the value. Either datetime type, or
5074                       :py:class:`pyderasn.UTCTime` object
5075         :param bytes impl: override default tag with ``IMPLICIT`` one
5076         :param bytes expl: override default tag with ``EXPLICIT`` one
5077         :param default: set default value. Type same as in ``value``
5078         :param bool optional: is object ``OPTIONAL`` in sequence
5079         """
5080         super(UTCTime, self).__init__(
5081             None, None, impl, expl, None, optional, _decoded, ctx,
5082         )
5083         self._value = value
5084         self.ber_raw = None
5085         if value is not None:
5086             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5087             self.ber_encoded = self.ber_raw is not None
5088         if default is not None:
5089             default, _ = self._value_sanitize(default)
5090             self.default = self.__class__(
5091                 value=default,
5092                 impl=self.tag,
5093                 expl=self._expl,
5094             )
5095             if self._value is None:
5096                 self._value = default
5097             optional = True
5098         self.optional = optional
5099
5100     def _strptime_bered(self, value):
5101         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5102         value = value[10:]
5103         if len(value) == 0:
5104             raise ValueError("no timezone")
5105         year += 2000 if year < 50 else 1900
5106         decoded = datetime(year, month, day, hour, minute)
5107         offset = 0
5108         if value[-1] == "Z":
5109             value = value[:-1]
5110         else:
5111             if len(value) < 5:
5112                 raise ValueError("invalid UTC offset")
5113             if value[-5] == "-":
5114                 sign = -1
5115             elif value[-5] == "+":
5116                 sign = 1
5117             else:
5118                 raise ValueError("invalid UTC offset")
5119             v = pureint(value[-4:])
5120             offset, v = (60 * (v % 100)), v // 100
5121             if offset >= 3600:
5122                 raise ValueError("invalid UTC offset minutes")
5123             offset += 3600 * v
5124             if offset > 14 * 3600:
5125                 raise ValueError("too big UTC offset")
5126             offset *= sign
5127             value = value[:-5]
5128         if len(value) == 0:
5129             return offset, decoded
5130         if len(value) != 2:
5131             raise ValueError("invalid UTC offset seconds")
5132         seconds = pureint(value)
5133         if seconds >= 60:
5134             raise ValueError("invalid seconds value")
5135         return offset, decoded + timedelta(seconds=seconds)
5136
5137     def _strptime(self, value):
5138         # datetime.strptime's format: %y%m%d%H%M%SZ
5139         if len(value) != LEN_YYMMDDHHMMSSZ:
5140             raise ValueError("invalid UTCTime length")
5141         if value[-1] != "Z":
5142             raise ValueError("non UTC timezone")
5143         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5144         year += 2000 if year < 50 else 1900
5145         return datetime(year, month, day, hour, minute, second)
5146
5147     def _dt_sanitize(self, value):
5148         if value.year < 1950 or value.year > 2049:
5149             raise ValueError("UTCTime can hold only 1950-2049 years")
5150         return value.replace(microsecond=0)
5151
5152     def _value_sanitize(self, value, ctx=None):
5153         if value.__class__ == binary_type:
5154             try:
5155                 value_decoded = value.decode("ascii")
5156             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5157                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5158             err = None
5159             try:
5160                 return self._strptime(value_decoded), None
5161             except (TypeError, ValueError) as _err:
5162                 err = _err
5163                 if (ctx is not None) and ctx.get("bered", False):
5164                     try:
5165                         offset, _value = self._strptime_bered(value_decoded)
5166                         _value = _value - timedelta(seconds=offset)
5167                         return self._dt_sanitize(_value), value
5168                     except (TypeError, ValueError, OverflowError) as _err:
5169                         err = _err
5170             raise DecodeError(
5171                 "invalid %s format: %r" % (self.asn1_type_name, err),
5172                 klass=self.__class__,
5173             )
5174         if isinstance(value, self.__class__):
5175             return value._value, None
5176         if value.__class__ == datetime:
5177             return self._dt_sanitize(value), None
5178         raise InvalidValueType((self.__class__, datetime))
5179
5180     def _pp_value(self):
5181         if self.ready:
5182             value = self._value.isoformat()
5183             if self.ber_encoded:
5184                 value += " (%s)" % self.ber_raw
5185             return value
5186         return None
5187
5188     def __unicode__(self):
5189         if self.ready:
5190             value = self._value.isoformat()
5191             if self.ber_encoded:
5192                 value += " (%s)" % self.ber_raw
5193             return value
5194         return text_type(self._pp_value())
5195
5196     def __getstate__(self):
5197         return UTCTimeState(
5198             *super(UTCTime, self).__getstate__(),
5199             **{"ber_raw": self.ber_raw}
5200         )
5201
5202     def __setstate__(self, state):
5203         super(UTCTime, self).__setstate__(state)
5204         self.ber_raw = state.ber_raw
5205
5206     def __bytes__(self):
5207         self._assert_ready()
5208         return self._encode_time()
5209
5210     def __eq__(self, their):
5211         if their.__class__ == binary_type:
5212             return self._encode_time() == their
5213         if their.__class__ == datetime:
5214             return self.todatetime() == their
5215         if not isinstance(their, self.__class__):
5216             return False
5217         return (
5218             self._value == their._value and
5219             self.tag == their.tag and
5220             self._expl == their._expl
5221         )
5222
5223     def _encode_time(self):
5224         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5225
5226     def _encode(self):
5227         self._assert_ready()
5228         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5229
5230     def _encode1st(self, state):
5231         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5232
5233     def _encode2nd(self, writer, state_iter):
5234         self._assert_ready()
5235         write_full(writer, self._encode())
5236
5237     def _encode_cer(self, writer):
5238         write_full(writer, self._encode())
5239
5240     def todatetime(self):
5241         return self._value
5242
5243     def __repr__(self):
5244         return pp_console_row(next(self.pps()))
5245
5246     def pps(self, decode_path=()):
5247         yield _pp(
5248             obj=self,
5249             asn1_type_name=self.asn1_type_name,
5250             obj_name=self.__class__.__name__,
5251             decode_path=decode_path,
5252             value=self._pp_value(),
5253             optional=self.optional,
5254             default=self == self.default,
5255             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5256             expl=None if self._expl is None else tag_decode(self._expl),
5257             offset=self.offset,
5258             tlen=self.tlen,
5259             llen=self.llen,
5260             vlen=self.vlen,
5261             expl_offset=self.expl_offset if self.expled else None,
5262             expl_tlen=self.expl_tlen if self.expled else None,
5263             expl_llen=self.expl_llen if self.expled else None,
5264             expl_vlen=self.expl_vlen if self.expled else None,
5265             expl_lenindef=self.expl_lenindef,
5266             ber_encoded=self.ber_encoded,
5267             bered=self.bered,
5268         )
5269         for pp in self.pps_lenindef(decode_path):
5270             yield pp
5271
5272
5273 class GeneralizedTime(UTCTime):
5274     """``GeneralizedTime`` datetime type
5275
5276     This type is similar to :py:class:`pyderasn.UTCTime`.
5277
5278     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5279     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5280     >>> str(t)
5281     '20170930220750.000123Z'
5282     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5283     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5284
5285     .. warning::
5286
5287        Only microsecond fractions are supported in DER encoding.
5288        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5289        higher precision values.
5290
5291     .. warning::
5292
5293        BER encoded data can loss information (accuracy) during decoding
5294        because of float transformations.
5295
5296     .. warning::
5297
5298        Local times (without explicit timezone specification) are treated
5299        as UTC one, no transformations are made.
5300
5301     .. warning::
5302
5303        Zero year is unsupported.
5304     """
5305     __slots__ = ()
5306     tag_default = tag_encode(24)
5307     asn1_type_name = "GeneralizedTime"
5308
5309     def _dt_sanitize(self, value):
5310         return value
5311
5312     def _strptime_bered(self, value):
5313         if len(value) < 4 + 3 * 2:
5314             raise ValueError("invalid GeneralizedTime")
5315         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5316         decoded = datetime(year, month, day, hour)
5317         offset, value = 0, value[10:]
5318         if len(value) == 0:
5319             return offset, decoded
5320         if value[-1] == "Z":
5321             value = value[:-1]
5322         else:
5323             for char, sign in (("-", -1), ("+", 1)):
5324                 idx = value.rfind(char)
5325                 if idx == -1:
5326                     continue
5327                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5328                 v = pureint(offset_raw)
5329                 if len(offset_raw) == 4:
5330                     offset, v = (60 * (v % 100)), v // 100
5331                     if offset >= 3600:
5332                         raise ValueError("invalid UTC offset minutes")
5333                 elif len(offset_raw) == 2:
5334                     pass
5335                 else:
5336                     raise ValueError("invalid UTC offset")
5337                 offset += 3600 * v
5338                 if offset > 14 * 3600:
5339                     raise ValueError("too big UTC offset")
5340                 offset *= sign
5341                 break
5342         if len(value) == 0:
5343             return offset, decoded
5344         if value[0] in DECIMAL_SIGNS:
5345             return offset, (
5346                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5347             )
5348         if len(value) < 2:
5349             raise ValueError("stripped minutes")
5350         decoded += timedelta(seconds=60 * pureint(value[:2]))
5351         value = value[2:]
5352         if len(value) == 0:
5353             return offset, decoded
5354         if value[0] in DECIMAL_SIGNS:
5355             return offset, (
5356                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5357             )
5358         if len(value) < 2:
5359             raise ValueError("stripped seconds")
5360         decoded += timedelta(seconds=pureint(value[:2]))
5361         value = value[2:]
5362         if len(value) == 0:
5363             return offset, decoded
5364         if value[0] not in DECIMAL_SIGNS:
5365             raise ValueError("invalid format after seconds")
5366         return offset, (
5367             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5368         )
5369
5370     def _strptime(self, value):
5371         l = len(value)
5372         if l == LEN_YYYYMMDDHHMMSSZ:
5373             # datetime.strptime's format: %Y%m%d%H%M%SZ
5374             if value[-1] != "Z":
5375                 raise ValueError("non UTC timezone")
5376             return datetime(*str_to_time_fractions(value[:-1]))
5377         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5378             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5379             if value[-1] != "Z":
5380                 raise ValueError("non UTC timezone")
5381             if value[14] != ".":
5382                 raise ValueError("no fractions separator")
5383             us = value[15:-1]
5384             if us[-1] == "0":
5385                 raise ValueError("trailing zero")
5386             us_len = len(us)
5387             if us_len > 6:
5388                 raise ValueError("only microsecond fractions are supported")
5389             us = pureint(us + ("0" * (6 - us_len)))
5390             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5391             return datetime(year, month, day, hour, minute, second, us)
5392         raise ValueError("invalid GeneralizedTime length")
5393
5394     def _encode_time(self):
5395         value = self._value
5396         encoded = value.strftime("%Y%m%d%H%M%S")
5397         if value.microsecond > 0:
5398             encoded += (".%06d" % value.microsecond).rstrip("0")
5399         return (encoded + "Z").encode("ascii")
5400
5401     def _encode(self):
5402         self._assert_ready()
5403         value = self._value
5404         if value.microsecond > 0:
5405             encoded = self._encode_time()
5406             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5407         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5408
5409     def _encode1st(self, state):
5410         self._assert_ready()
5411         vlen = len(self._encode_time())
5412         return len(self.tag) + len_size(vlen) + vlen, state
5413
5414     def _encode2nd(self, writer, state_iter):
5415         write_full(writer, self._encode())
5416
5417
5418 class GraphicString(CommonString):
5419     __slots__ = ()
5420     tag_default = tag_encode(25)
5421     encoding = "iso-8859-1"
5422     asn1_type_name = "GraphicString"
5423
5424
5425 class ISO646String(VisibleString):
5426     __slots__ = ()
5427     asn1_type_name = "ISO646String"
5428
5429
5430 class GeneralString(CommonString):
5431     __slots__ = ()
5432     tag_default = tag_encode(27)
5433     encoding = "iso-8859-1"
5434     asn1_type_name = "GeneralString"
5435
5436
5437 class UniversalString(CommonString):
5438     __slots__ = ()
5439     tag_default = tag_encode(28)
5440     encoding = "utf-32-be"
5441     asn1_type_name = "UniversalString"
5442
5443
5444 class BMPString(CommonString):
5445     __slots__ = ()
5446     tag_default = tag_encode(30)
5447     encoding = "utf-16-be"
5448     asn1_type_name = "BMPString"
5449
5450
5451 ChoiceState = namedtuple(
5452     "ChoiceState",
5453     BasicState._fields + ("specs", "value",),
5454     **NAMEDTUPLE_KWARGS
5455 )
5456
5457
5458 class Choice(Obj):
5459     """``CHOICE`` special type
5460
5461     ::
5462
5463         class GeneralName(Choice):
5464             schema = (
5465                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5466                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5467             )
5468
5469     >>> gn = GeneralName()
5470     GeneralName CHOICE
5471     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5472     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5473     >>> gn["dNSName"] = IA5String("bar.baz")
5474     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5475     >>> gn["rfc822Name"]
5476     None
5477     >>> gn["dNSName"]
5478     [2] IA5String IA5 bar.baz
5479     >>> gn.choice
5480     'dNSName'
5481     >>> gn.value == gn["dNSName"]
5482     True
5483     >>> gn.specs
5484     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5485
5486     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5487     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5488     """
5489     __slots__ = ("specs",)
5490     tag_default = None
5491     asn1_type_name = "CHOICE"
5492
5493     def __init__(
5494             self,
5495             value=None,
5496             schema=None,
5497             impl=None,
5498             expl=None,
5499             default=None,
5500             optional=False,
5501             _decoded=(0, 0, 0),
5502     ):
5503         """
5504         :param value: set the value. Either ``(choice, value)`` tuple, or
5505                       :py:class:`pyderasn.Choice` object
5506         :param bytes impl: can not be set, do **not** use it
5507         :param bytes expl: override default tag with ``EXPLICIT`` one
5508         :param default: set default value. Type same as in ``value``
5509         :param bool optional: is object ``OPTIONAL`` in sequence
5510         """
5511         if impl is not None:
5512             raise ValueError("no implicit tag allowed for CHOICE")
5513         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5514         if schema is None:
5515             schema = getattr(self, "schema", ())
5516         if len(schema) == 0:
5517             raise ValueError("schema must be specified")
5518         self.specs = (
5519             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5520         )
5521         self._value = None
5522         if value is not None:
5523             self._value = self._value_sanitize(value)
5524         if default is not None:
5525             default_value = self._value_sanitize(default)
5526             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5527             default_obj.specs = self.specs
5528             default_obj._value = default_value
5529             self.default = default_obj
5530             if value is None:
5531                 self._value = copy(default_obj._value)
5532         if self._expl is not None:
5533             tag_class, _, tag_num = tag_decode(self._expl)
5534             self._tag_order = (tag_class, tag_num)
5535
5536     def _value_sanitize(self, value):
5537         if (value.__class__ == tuple) and len(value) == 2:
5538             choice, obj = value
5539             spec = self.specs.get(choice)
5540             if spec is None:
5541                 raise ObjUnknown(choice)
5542             if not isinstance(obj, spec.__class__):
5543                 raise InvalidValueType((spec,))
5544             return (choice, spec(obj))
5545         if isinstance(value, self.__class__):
5546             return value._value
5547         raise InvalidValueType((self.__class__, tuple))
5548
5549     @property
5550     def ready(self):
5551         return self._value is not None and self._value[1].ready
5552
5553     @property
5554     def bered(self):
5555         return self.expl_lenindef or (
5556             (self._value is not None) and
5557             self._value[1].bered
5558         )
5559
5560     def __getstate__(self):
5561         return ChoiceState(
5562             __version__,
5563             self.tag,
5564             self._tag_order,
5565             self._expl,
5566             self.default,
5567             self.optional,
5568             self.offset,
5569             self.llen,
5570             self.vlen,
5571             self.expl_lenindef,
5572             self.lenindef,
5573             self.ber_encoded,
5574             self.specs,
5575             copy(self._value),
5576         )
5577
5578     def __setstate__(self, state):
5579         super(Choice, self).__setstate__(state)
5580         self.specs = state.specs
5581         self._value = state.value
5582
5583     def __eq__(self, their):
5584         if (their.__class__ == tuple) and len(their) == 2:
5585             return self._value == their
5586         if not isinstance(their, self.__class__):
5587             return False
5588         return (
5589             self.specs == their.specs and
5590             self._value == their._value
5591         )
5592
5593     def __call__(
5594             self,
5595             value=None,
5596             expl=None,
5597             default=None,
5598             optional=None,
5599     ):
5600         return self.__class__(
5601             value=value,
5602             schema=self.specs,
5603             expl=self._expl if expl is None else expl,
5604             default=self.default if default is None else default,
5605             optional=self.optional if optional is None else optional,
5606         )
5607
5608     @property
5609     def choice(self):
5610         """Name of the choice
5611         """
5612         self._assert_ready()
5613         return self._value[0]
5614
5615     @property
5616     def value(self):
5617         """Value of underlying choice
5618         """
5619         self._assert_ready()
5620         return self._value[1]
5621
5622     @property
5623     def tag_order(self):
5624         self._assert_ready()
5625         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5626
5627     @property
5628     def tag_order_cer(self):
5629         return min(v.tag_order_cer for v in itervalues(self.specs))
5630
5631     def __getitem__(self, key):
5632         if key not in self.specs:
5633             raise ObjUnknown(key)
5634         if self._value is None:
5635             return None
5636         choice, value = self._value
5637         if choice != key:
5638             return None
5639         return value
5640
5641     def __setitem__(self, key, value):
5642         spec = self.specs.get(key)
5643         if spec is None:
5644             raise ObjUnknown(key)
5645         if not isinstance(value, spec.__class__):
5646             raise InvalidValueType((spec.__class__,))
5647         self._value = (key, spec(value))
5648
5649     @property
5650     def tlen(self):
5651         return 0
5652
5653     @property
5654     def decoded(self):
5655         return self._value[1].decoded if self.ready else False
5656
5657     def _encode(self):
5658         self._assert_ready()
5659         return self._value[1].encode()
5660
5661     def _encode1st(self, state):
5662         self._assert_ready()
5663         return self._value[1].encode1st(state)
5664
5665     def _encode2nd(self, writer, state_iter):
5666         self._value[1].encode2nd(writer, state_iter)
5667
5668     def _encode_cer(self, writer):
5669         self._assert_ready()
5670         self._value[1].encode_cer(writer)
5671
5672     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5673         for choice, spec in iteritems(self.specs):
5674             sub_decode_path = decode_path + (choice,)
5675             try:
5676                 spec.decode(
5677                     tlv,
5678                     offset=offset,
5679                     leavemm=True,
5680                     decode_path=sub_decode_path,
5681                     ctx=ctx,
5682                     tag_only=True,
5683                     _ctx_immutable=False,
5684                 )
5685             except TagMismatch:
5686                 continue
5687             break
5688         else:
5689             raise TagMismatch(
5690                 klass=self.__class__,
5691                 decode_path=decode_path,
5692                 offset=offset,
5693             )
5694         if tag_only:  # pragma: no cover
5695             yield None
5696             return
5697         if evgen_mode:
5698             for _decode_path, value, tail in spec.decode_evgen(
5699                     tlv,
5700                     offset=offset,
5701                     leavemm=True,
5702                     decode_path=sub_decode_path,
5703                     ctx=ctx,
5704                     _ctx_immutable=False,
5705             ):
5706                 yield _decode_path, value, tail
5707         else:
5708             _, value, tail = next(spec.decode_evgen(
5709                 tlv,
5710                 offset=offset,
5711                 leavemm=True,
5712                 decode_path=sub_decode_path,
5713                 ctx=ctx,
5714                 _ctx_immutable=False,
5715                 _evgen_mode=False,
5716             ))
5717         obj = self.__class__(
5718             schema=self.specs,
5719             expl=self._expl,
5720             default=self.default,
5721             optional=self.optional,
5722             _decoded=(offset, 0, value.fulllen),
5723         )
5724         obj._value = (choice, value)
5725         yield decode_path, obj, tail
5726
5727     def __repr__(self):
5728         value = pp_console_row(next(self.pps()))
5729         if self.ready:
5730             value = "%s[%r]" % (value, self.value)
5731         return value
5732
5733     def pps(self, decode_path=()):
5734         yield _pp(
5735             obj=self,
5736             asn1_type_name=self.asn1_type_name,
5737             obj_name=self.__class__.__name__,
5738             decode_path=decode_path,
5739             value=self.choice if self.ready else None,
5740             optional=self.optional,
5741             default=self == self.default,
5742             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5743             expl=None if self._expl is None else tag_decode(self._expl),
5744             offset=self.offset,
5745             tlen=self.tlen,
5746             llen=self.llen,
5747             vlen=self.vlen,
5748             expl_lenindef=self.expl_lenindef,
5749             bered=self.bered,
5750         )
5751         if self.ready:
5752             yield self.value.pps(decode_path=decode_path + (self.choice,))
5753         for pp in self.pps_lenindef(decode_path):
5754             yield pp
5755
5756
5757 class PrimitiveTypes(Choice):
5758     """Predefined ``CHOICE`` for all generic primitive types
5759
5760     It could be useful for general decoding of some unspecified values:
5761
5762     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5763     OCTET STRING 3 bytes 666f6f
5764     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5765     INTEGER 1193046
5766     """
5767     __slots__ = ()
5768     schema = tuple((klass.__name__, klass()) for klass in (
5769         Boolean,
5770         Integer,
5771         BitString,
5772         OctetString,
5773         Null,
5774         ObjectIdentifier,
5775         UTF8String,
5776         NumericString,
5777         PrintableString,
5778         TeletexString,
5779         VideotexString,
5780         IA5String,
5781         UTCTime,
5782         GeneralizedTime,
5783         GraphicString,
5784         VisibleString,
5785         ISO646String,
5786         GeneralString,
5787         UniversalString,
5788         BMPString,
5789     ))
5790
5791
5792 AnyState = namedtuple(
5793     "AnyState",
5794     BasicState._fields + ("value", "defined"),
5795     **NAMEDTUPLE_KWARGS
5796 )
5797
5798
5799 class Any(Obj):
5800     """``ANY`` special type
5801
5802     >>> Any(Integer(-123))
5803     ANY INTEGER -123 (0X:7B)
5804     >>> a = Any(OctetString(b"hello world").encode())
5805     ANY 040b68656c6c6f20776f726c64
5806     >>> hexenc(bytes(a))
5807     b'0x040x0bhello world'
5808     """
5809     __slots__ = ("defined",)
5810     tag_default = tag_encode(0)
5811     asn1_type_name = "ANY"
5812
5813     def __init__(
5814             self,
5815             value=None,
5816             expl=None,
5817             optional=False,
5818             _decoded=(0, 0, 0),
5819     ):
5820         """
5821         :param value: set the value. Either any kind of pyderasn's
5822                       **ready** object, or bytes. Pay attention that
5823                       **no** validation is performed if raw binary value
5824                       is valid TLV, except just tag decoding
5825         :param bytes expl: override default tag with ``EXPLICIT`` one
5826         :param bool optional: is object ``OPTIONAL`` in sequence
5827         """
5828         super(Any, self).__init__(None, expl, None, optional, _decoded)
5829         if value is None:
5830             self._value = None
5831         else:
5832             value = self._value_sanitize(value)
5833             self._value = value
5834             if self._expl is None:
5835                 if value.__class__ == binary_type:
5836                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5837                 else:
5838                     tag_class, tag_num = value.tag_order
5839             else:
5840                 tag_class, _, tag_num = tag_decode(self._expl)
5841             self._tag_order = (tag_class, tag_num)
5842         self.defined = None
5843
5844     def _value_sanitize(self, value):
5845         if value.__class__ == binary_type:
5846             if len(value) == 0:
5847                 raise ValueError("Any value can not be empty")
5848             return value
5849         if isinstance(value, self.__class__):
5850             return value._value
5851         if not isinstance(value, Obj):
5852             raise InvalidValueType((self.__class__, Obj, binary_type))
5853         return value
5854
5855     @property
5856     def ready(self):
5857         return self._value is not None
5858
5859     @property
5860     def tag_order(self):
5861         self._assert_ready()
5862         return self._tag_order
5863
5864     @property
5865     def bered(self):
5866         if self.expl_lenindef or self.lenindef:
5867             return True
5868         if self.defined is None:
5869             return False
5870         return self.defined[1].bered
5871
5872     def __getstate__(self):
5873         return AnyState(
5874             __version__,
5875             self.tag,
5876             self._tag_order,
5877             self._expl,
5878             None,
5879             self.optional,
5880             self.offset,
5881             self.llen,
5882             self.vlen,
5883             self.expl_lenindef,
5884             self.lenindef,
5885             self.ber_encoded,
5886             self._value,
5887             self.defined,
5888         )
5889
5890     def __setstate__(self, state):
5891         super(Any, self).__setstate__(state)
5892         self._value = state.value
5893         self.defined = state.defined
5894
5895     def __eq__(self, their):
5896         if their.__class__ == binary_type:
5897             if self._value.__class__ == binary_type:
5898                 return self._value == their
5899             return self._value.encode() == their
5900         if issubclass(their.__class__, Any):
5901             if self.ready and their.ready:
5902                 return bytes(self) == bytes(their)
5903             return self.ready == their.ready
5904         return False
5905
5906     def __call__(
5907             self,
5908             value=None,
5909             expl=None,
5910             optional=None,
5911     ):
5912         return self.__class__(
5913             value=value,
5914             expl=self._expl if expl is None else expl,
5915             optional=self.optional if optional is None else optional,
5916         )
5917
5918     def __bytes__(self):
5919         self._assert_ready()
5920         value = self._value
5921         if value.__class__ == binary_type:
5922             return value
5923         return self._value.encode()
5924
5925     @property
5926     def tlen(self):
5927         return 0
5928
5929     def _encode(self):
5930         self._assert_ready()
5931         value = self._value
5932         if value.__class__ == binary_type:
5933             return value
5934         return value.encode()
5935
5936     def _encode1st(self, state):
5937         self._assert_ready()
5938         value = self._value
5939         if value.__class__ == binary_type:
5940             return len(value), state
5941         return value.encode1st(state)
5942
5943     def _encode2nd(self, writer, state_iter):
5944         value = self._value
5945         if value.__class__ == binary_type:
5946             write_full(writer, value)
5947         else:
5948             value.encode2nd(writer, state_iter)
5949
5950     def _encode_cer(self, writer):
5951         self._assert_ready()
5952         value = self._value
5953         if value.__class__ == binary_type:
5954             write_full(writer, value)
5955         else:
5956             value.encode_cer(writer)
5957
5958     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5959         try:
5960             t, tlen, lv = tag_strip(tlv)
5961         except DecodeError as err:
5962             raise err.__class__(
5963                 msg=err.msg,
5964                 klass=self.__class__,
5965                 decode_path=decode_path,
5966                 offset=offset,
5967             )
5968         try:
5969             l, llen, v = len_decode(lv)
5970         except LenIndefForm as err:
5971             if not ctx.get("bered", False):
5972                 raise err.__class__(
5973                     msg=err.msg,
5974                     klass=self.__class__,
5975                     decode_path=decode_path,
5976                     offset=offset,
5977                 )
5978             llen, vlen, v = 1, 0, lv[1:]
5979             sub_offset = offset + tlen + llen
5980             chunk_i = 0
5981             while v[:EOC_LEN].tobytes() != EOC:
5982                 chunk, v = Any().decode(
5983                     v,
5984                     offset=sub_offset,
5985                     decode_path=decode_path + (str(chunk_i),),
5986                     leavemm=True,
5987                     ctx=ctx,
5988                     _ctx_immutable=False,
5989                 )
5990                 vlen += chunk.tlvlen
5991                 sub_offset += chunk.tlvlen
5992                 chunk_i += 1
5993             tlvlen = tlen + llen + vlen + EOC_LEN
5994             obj = self.__class__(
5995                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5996                 expl=self._expl,
5997                 optional=self.optional,
5998                 _decoded=(offset, 0, tlvlen),
5999             )
6000             obj.lenindef = True
6001             obj.tag = t.tobytes()
6002             yield decode_path, obj, v[EOC_LEN:]
6003             return
6004         except DecodeError as err:
6005             raise err.__class__(
6006                 msg=err.msg,
6007                 klass=self.__class__,
6008                 decode_path=decode_path,
6009                 offset=offset,
6010             )
6011         if l > len(v):
6012             raise NotEnoughData(
6013                 "encoded length is longer than data",
6014                 klass=self.__class__,
6015                 decode_path=decode_path,
6016                 offset=offset,
6017             )
6018         tlvlen = tlen + llen + l
6019         v, tail = tlv[:tlvlen], v[l:]
6020         obj = self.__class__(
6021             value=None if evgen_mode else v.tobytes(),
6022             expl=self._expl,
6023             optional=self.optional,
6024             _decoded=(offset, 0, tlvlen),
6025         )
6026         obj.tag = t.tobytes()
6027         yield decode_path, obj, tail
6028
6029     def __repr__(self):
6030         return pp_console_row(next(self.pps()))
6031
6032     def pps(self, decode_path=()):
6033         value = self._value
6034         if value is None:
6035             pass
6036         elif value.__class__ == binary_type:
6037             value = None
6038         else:
6039             value = repr(value)
6040         yield _pp(
6041             obj=self,
6042             asn1_type_name=self.asn1_type_name,
6043             obj_name=self.__class__.__name__,
6044             decode_path=decode_path,
6045             value=value,
6046             blob=self._value if self._value.__class__ == binary_type else None,
6047             optional=self.optional,
6048             default=self == self.default,
6049             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6050             expl=None if self._expl is None else tag_decode(self._expl),
6051             offset=self.offset,
6052             tlen=self.tlen,
6053             llen=self.llen,
6054             vlen=self.vlen,
6055             expl_offset=self.expl_offset if self.expled else None,
6056             expl_tlen=self.expl_tlen if self.expled else None,
6057             expl_llen=self.expl_llen if self.expled else None,
6058             expl_vlen=self.expl_vlen if self.expled else None,
6059             expl_lenindef=self.expl_lenindef,
6060             lenindef=self.lenindef,
6061             bered=self.bered,
6062         )
6063         defined_by, defined = self.defined or (None, None)
6064         if defined_by is not None:
6065             yield defined.pps(
6066                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6067             )
6068         for pp in self.pps_lenindef(decode_path):
6069             yield pp
6070
6071
6072 ########################################################################
6073 # ASN.1 constructed types
6074 ########################################################################
6075
6076 def abs_decode_path(decode_path, rel_path):
6077     """Create an absolute decode path from current and relative ones
6078
6079     :param decode_path: current decode path, starting point. Tuple of strings
6080     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6081                      If first tuple's element is "/", then treat it as
6082                      an absolute path, ignoring ``decode_path`` as
6083                      starting point. Also this tuple can contain ".."
6084                      elements, stripping the leading element from
6085                      ``decode_path``
6086
6087     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6088     ("foo", "bar", "baz", "whatever")
6089     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6090     ("foo", "whatever")
6091     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6092     ("baz", "whatever")
6093     """
6094     if rel_path[0] == "/":
6095         return rel_path[1:]
6096     if rel_path[0] == "..":
6097         return abs_decode_path(decode_path[:-1], rel_path[1:])
6098     return decode_path + rel_path
6099
6100
6101 SequenceState = namedtuple(
6102     "SequenceState",
6103     BasicState._fields + ("specs", "value",),
6104     **NAMEDTUPLE_KWARGS
6105 )
6106
6107
6108 class SequenceEncode1stMixing(object):
6109     def _encode1st(self, state):
6110         state.append(0)
6111         idx = len(state) - 1
6112         vlen = 0
6113         for v in self._values_for_encoding():
6114             l, _ = v.encode1st(state)
6115             vlen += l
6116         state[idx] = vlen
6117         return len(self.tag) + len_size(vlen) + vlen, state
6118
6119
6120 class Sequence(SequenceEncode1stMixing, Obj):
6121     """``SEQUENCE`` structure type
6122
6123     You have to make specification of sequence::
6124
6125         class Extension(Sequence):
6126             schema = (
6127                 ("extnID", ObjectIdentifier()),
6128                 ("critical", Boolean(default=False)),
6129                 ("extnValue", OctetString()),
6130             )
6131
6132     Then, you can work with it as with dictionary.
6133
6134     >>> ext = Extension()
6135     >>> Extension().specs
6136     OrderedDict([
6137         ('extnID', OBJECT IDENTIFIER),
6138         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6139         ('extnValue', OCTET STRING),
6140     ])
6141     >>> ext["extnID"] = "1.2.3"
6142     Traceback (most recent call last):
6143     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6144     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6145
6146     You can determine if sequence is ready to be encoded:
6147
6148     >>> ext.ready
6149     False
6150     >>> ext.encode()
6151     Traceback (most recent call last):
6152     pyderasn.ObjNotReady: object is not ready: extnValue
6153     >>> ext["extnValue"] = OctetString(b"foobar")
6154     >>> ext.ready
6155     True
6156
6157     Value you want to assign, must have the same **type** as in
6158     corresponding specification, but it can have different tags,
6159     optional/default attributes -- they will be taken from specification
6160     automatically::
6161
6162         class TBSCertificate(Sequence):
6163             schema = (
6164                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6165             [...]
6166
6167     >>> tbs = TBSCertificate()
6168     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6169
6170     Assign ``None`` to remove value from sequence.
6171
6172     You can set values in Sequence during its initialization:
6173
6174     >>> AlgorithmIdentifier((
6175         ("algorithm", ObjectIdentifier("1.2.3")),
6176         ("parameters", Any(Null()))
6177     ))
6178     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6179
6180     You can determine if value exists/set in the sequence and take its value:
6181
6182     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6183     (True, True, False)
6184     >>> ext["extnID"]
6185     OBJECT IDENTIFIER 1.2.3
6186
6187     But pay attention that if value has default, then it won't be (not
6188     in) in the sequence (because ``DEFAULT`` must not be encoded in
6189     DER), but you can read its value:
6190
6191     >>> "critical" in ext, ext["critical"]
6192     (False, BOOLEAN False)
6193     >>> ext["critical"] = Boolean(True)
6194     >>> "critical" in ext, ext["critical"]
6195     (True, BOOLEAN True)
6196
6197     All defaulted values are always optional.
6198
6199     .. _allow_default_values_ctx:
6200
6201     DER prohibits default value encoding and will raise an error if
6202     default value is unexpectedly met during decode.
6203     If :ref:`bered <bered_ctx>` context option is set, then no error
6204     will be raised, but ``bered`` attribute set. You can disable strict
6205     defaulted values existence validation by setting
6206     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6207
6208     All values with DEFAULT specified are decoded atomically in
6209     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6210     SEQUENCE, then it will be yielded as a single element, not
6211     disassembled. That is required for DEFAULT existence check.
6212
6213     Two sequences are equal if they have equal specification (schema),
6214     implicit/explicit tagging and the same values.
6215     """
6216     __slots__ = ("specs",)
6217     tag_default = tag_encode(form=TagFormConstructed, num=16)
6218     asn1_type_name = "SEQUENCE"
6219
6220     def __init__(
6221             self,
6222             value=None,
6223             schema=None,
6224             impl=None,
6225             expl=None,
6226             default=None,
6227             optional=False,
6228             _decoded=(0, 0, 0),
6229     ):
6230         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6231         if schema is None:
6232             schema = getattr(self, "schema", ())
6233         self.specs = (
6234             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6235         )
6236         self._value = {}
6237         if value is not None:
6238             if issubclass(value.__class__, Sequence):
6239                 self._value = value._value
6240             elif hasattr(value, "__iter__"):
6241                 for seq_key, seq_value in value:
6242                     self[seq_key] = seq_value
6243             else:
6244                 raise InvalidValueType((Sequence,))
6245         if default is not None:
6246             if not issubclass(default.__class__, Sequence):
6247                 raise InvalidValueType((Sequence,))
6248             default_value = default._value
6249             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6250             default_obj.specs = self.specs
6251             default_obj._value = default_value
6252             self.default = default_obj
6253             if value is None:
6254                 self._value = copy(default_obj._value)
6255
6256     @property
6257     def ready(self):
6258         for name, spec in iteritems(self.specs):
6259             value = self._value.get(name)
6260             if value is None:
6261                 if spec.optional:
6262                     continue
6263                 return False
6264             if not value.ready:
6265                 return False
6266         return True
6267
6268     @property
6269     def bered(self):
6270         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6271             return True
6272         return any(value.bered for value in itervalues(self._value))
6273
6274     def __getstate__(self):
6275         return SequenceState(
6276             __version__,
6277             self.tag,
6278             self._tag_order,
6279             self._expl,
6280             self.default,
6281             self.optional,
6282             self.offset,
6283             self.llen,
6284             self.vlen,
6285             self.expl_lenindef,
6286             self.lenindef,
6287             self.ber_encoded,
6288             self.specs,
6289             {k: copy(v) for k, v in iteritems(self._value)},
6290         )
6291
6292     def __setstate__(self, state):
6293         super(Sequence, self).__setstate__(state)
6294         self.specs = state.specs
6295         self._value = state.value
6296
6297     def __eq__(self, their):
6298         if not isinstance(their, self.__class__):
6299             return False
6300         return (
6301             self.specs == their.specs and
6302             self.tag == their.tag and
6303             self._expl == their._expl and
6304             self._value == their._value
6305         )
6306
6307     def __call__(
6308             self,
6309             value=None,
6310             impl=None,
6311             expl=None,
6312             default=None,
6313             optional=None,
6314     ):
6315         return self.__class__(
6316             value=value,
6317             schema=self.specs,
6318             impl=self.tag if impl is None else impl,
6319             expl=self._expl if expl is None else expl,
6320             default=self.default if default is None else default,
6321             optional=self.optional if optional is None else optional,
6322         )
6323
6324     def __contains__(self, key):
6325         return key in self._value
6326
6327     def __setitem__(self, key, value):
6328         spec = self.specs.get(key)
6329         if spec is None:
6330             raise ObjUnknown(key)
6331         if value is None:
6332             self._value.pop(key, None)
6333             return
6334         if not isinstance(value, spec.__class__):
6335             raise InvalidValueType((spec.__class__,))
6336         value = spec(value=value)
6337         if spec.default is not None and value == spec.default:
6338             self._value.pop(key, None)
6339             return
6340         self._value[key] = value
6341
6342     def __getitem__(self, key):
6343         value = self._value.get(key)
6344         if value is not None:
6345             return value
6346         spec = self.specs.get(key)
6347         if spec is None:
6348             raise ObjUnknown(key)
6349         if spec.default is not None:
6350             return spec.default
6351         return None
6352
6353     def _values_for_encoding(self):
6354         for name, spec in iteritems(self.specs):
6355             value = self._value.get(name)
6356             if value is None:
6357                 if spec.optional:
6358                     continue
6359                 raise ObjNotReady(name)
6360             yield value
6361
6362     def _encode(self):
6363         v = b"".join(v.encode() for v in self._values_for_encoding())
6364         return b"".join((self.tag, len_encode(len(v)), v))
6365
6366     def _encode2nd(self, writer, state_iter):
6367         write_full(writer, self.tag + len_encode(next(state_iter)))
6368         for v in self._values_for_encoding():
6369             v.encode2nd(writer, state_iter)
6370
6371     def _encode_cer(self, writer):
6372         write_full(writer, self.tag + LENINDEF)
6373         for v in self._values_for_encoding():
6374             v.encode_cer(writer)
6375         write_full(writer, EOC)
6376
6377     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6378         try:
6379             t, tlen, lv = tag_strip(tlv)
6380         except DecodeError as err:
6381             raise err.__class__(
6382                 msg=err.msg,
6383                 klass=self.__class__,
6384                 decode_path=decode_path,
6385                 offset=offset,
6386             )
6387         if t != self.tag:
6388             raise TagMismatch(
6389                 klass=self.__class__,
6390                 decode_path=decode_path,
6391                 offset=offset,
6392             )
6393         if tag_only:  # pragma: no cover
6394             yield None
6395             return
6396         lenindef = False
6397         ctx_bered = ctx.get("bered", False)
6398         try:
6399             l, llen, v = len_decode(lv)
6400         except LenIndefForm as err:
6401             if not ctx_bered:
6402                 raise err.__class__(
6403                     msg=err.msg,
6404                     klass=self.__class__,
6405                     decode_path=decode_path,
6406                     offset=offset,
6407                 )
6408             l, llen, v = 0, 1, lv[1:]
6409             lenindef = True
6410         except DecodeError as err:
6411             raise err.__class__(
6412                 msg=err.msg,
6413                 klass=self.__class__,
6414                 decode_path=decode_path,
6415                 offset=offset,
6416             )
6417         if l > len(v):
6418             raise NotEnoughData(
6419                 "encoded length is longer than data",
6420                 klass=self.__class__,
6421                 decode_path=decode_path,
6422                 offset=offset,
6423             )
6424         if not lenindef:
6425             v, tail = v[:l], v[l:]
6426         vlen = 0
6427         sub_offset = offset + tlen + llen
6428         values = {}
6429         ber_encoded = False
6430         ctx_allow_default_values = ctx.get("allow_default_values", False)
6431         for name, spec in iteritems(self.specs):
6432             if spec.optional and (
6433                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6434                     len(v) == 0
6435             ):
6436                 continue
6437             spec_defaulted = spec.default is not None
6438             sub_decode_path = decode_path + (name,)
6439             try:
6440                 if evgen_mode and not spec_defaulted:
6441                     for _decode_path, value, v_tail in spec.decode_evgen(
6442                             v,
6443                             sub_offset,
6444                             leavemm=True,
6445                             decode_path=sub_decode_path,
6446                             ctx=ctx,
6447                             _ctx_immutable=False,
6448                     ):
6449                         yield _decode_path, value, v_tail
6450                 else:
6451                     _, value, v_tail = next(spec.decode_evgen(
6452                         v,
6453                         sub_offset,
6454                         leavemm=True,
6455                         decode_path=sub_decode_path,
6456                         ctx=ctx,
6457                         _ctx_immutable=False,
6458                         _evgen_mode=False,
6459                     ))
6460             except TagMismatch as err:
6461                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6462                     continue
6463                 raise
6464
6465             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6466             if not evgen_mode and defined is not None:
6467                 defined_by, defined_spec = defined
6468                 if issubclass(value.__class__, SequenceOf):
6469                     for i, _value in enumerate(value):
6470                         sub_sub_decode_path = sub_decode_path + (
6471                             str(i),
6472                             DecodePathDefBy(defined_by),
6473                         )
6474                         defined_value, defined_tail = defined_spec.decode(
6475                             memoryview(bytes(_value)),
6476                             sub_offset + (
6477                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6478                                 if value.expled else (value.tlen + value.llen)
6479                             ),
6480                             leavemm=True,
6481                             decode_path=sub_sub_decode_path,
6482                             ctx=ctx,
6483                             _ctx_immutable=False,
6484                         )
6485                         if len(defined_tail) > 0:
6486                             raise DecodeError(
6487                                 "remaining data",
6488                                 klass=self.__class__,
6489                                 decode_path=sub_sub_decode_path,
6490                                 offset=offset,
6491                             )
6492                         _value.defined = (defined_by, defined_value)
6493                 else:
6494                     defined_value, defined_tail = defined_spec.decode(
6495                         memoryview(bytes(value)),
6496                         sub_offset + (
6497                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6498                             if value.expled else (value.tlen + value.llen)
6499                         ),
6500                         leavemm=True,
6501                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6502                         ctx=ctx,
6503                         _ctx_immutable=False,
6504                     )
6505                     if len(defined_tail) > 0:
6506                         raise DecodeError(
6507                             "remaining data",
6508                             klass=self.__class__,
6509                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6510                             offset=offset,
6511                         )
6512                     value.defined = (defined_by, defined_value)
6513
6514             value_len = value.fulllen
6515             vlen += value_len
6516             sub_offset += value_len
6517             v = v_tail
6518             if spec_defaulted:
6519                 if evgen_mode:
6520                     yield sub_decode_path, value, v_tail
6521                 if value == spec.default:
6522                     if ctx_bered or ctx_allow_default_values:
6523                         ber_encoded = True
6524                     else:
6525                         raise DecodeError(
6526                             "DEFAULT value met",
6527                             klass=self.__class__,
6528                             decode_path=sub_decode_path,
6529                             offset=sub_offset,
6530                         )
6531             if not evgen_mode:
6532                 values[name] = value
6533                 spec_defines = getattr(spec, "defines", ())
6534                 if len(spec_defines) == 0:
6535                     defines_by_path = ctx.get("defines_by_path", ())
6536                     if len(defines_by_path) > 0:
6537                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6538                 if spec_defines is not None and len(spec_defines) > 0:
6539                     for rel_path, schema in spec_defines:
6540                         defined = schema.get(value, None)
6541                         if defined is not None:
6542                             ctx.setdefault("_defines", []).append((
6543                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6544                                 (value, defined),
6545                             ))
6546         if lenindef:
6547             if v[:EOC_LEN].tobytes() != EOC:
6548                 raise DecodeError(
6549                     "no EOC",
6550                     klass=self.__class__,
6551                     decode_path=decode_path,
6552                     offset=offset,
6553                 )
6554             tail = v[EOC_LEN:]
6555             vlen += EOC_LEN
6556         elif len(v) > 0:
6557             raise DecodeError(
6558                 "remaining data",
6559                 klass=self.__class__,
6560                 decode_path=decode_path,
6561                 offset=offset,
6562             )
6563         obj = self.__class__(
6564             schema=self.specs,
6565             impl=self.tag,
6566             expl=self._expl,
6567             default=self.default,
6568             optional=self.optional,
6569             _decoded=(offset, llen, vlen),
6570         )
6571         obj._value = values
6572         obj.lenindef = lenindef
6573         obj.ber_encoded = ber_encoded
6574         yield decode_path, obj, tail
6575
6576     def __repr__(self):
6577         value = pp_console_row(next(self.pps()))
6578         cols = []
6579         for name in self.specs:
6580             _value = self._value.get(name)
6581             if _value is None:
6582                 continue
6583             cols.append("%s: %s" % (name, repr(_value)))
6584         return "%s[%s]" % (value, "; ".join(cols))
6585
6586     def pps(self, decode_path=()):
6587         yield _pp(
6588             obj=self,
6589             asn1_type_name=self.asn1_type_name,
6590             obj_name=self.__class__.__name__,
6591             decode_path=decode_path,
6592             optional=self.optional,
6593             default=self == self.default,
6594             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6595             expl=None if self._expl is None else tag_decode(self._expl),
6596             offset=self.offset,
6597             tlen=self.tlen,
6598             llen=self.llen,
6599             vlen=self.vlen,
6600             expl_offset=self.expl_offset if self.expled else None,
6601             expl_tlen=self.expl_tlen if self.expled else None,
6602             expl_llen=self.expl_llen if self.expled else None,
6603             expl_vlen=self.expl_vlen if self.expled else None,
6604             expl_lenindef=self.expl_lenindef,
6605             lenindef=self.lenindef,
6606             ber_encoded=self.ber_encoded,
6607             bered=self.bered,
6608         )
6609         for name in self.specs:
6610             value = self._value.get(name)
6611             if value is None:
6612                 continue
6613             yield value.pps(decode_path=decode_path + (name,))
6614         for pp in self.pps_lenindef(decode_path):
6615             yield pp
6616
6617
6618 class Set(Sequence, SequenceEncode1stMixing):
6619     """``SET`` structure type
6620
6621     Its usage is identical to :py:class:`pyderasn.Sequence`.
6622
6623     .. _allow_unordered_set_ctx:
6624
6625     DER prohibits unordered values encoding and will raise an error
6626     during decode. If :ref:`bered <bered_ctx>` context option is set,
6627     then no error will occur. Also you can disable strict values
6628     ordering check by setting ``"allow_unordered_set": True``
6629     :ref:`context <ctx>` option.
6630     """
6631     __slots__ = ()
6632     tag_default = tag_encode(form=TagFormConstructed, num=17)
6633     asn1_type_name = "SET"
6634
6635     def _values_for_encoding(self):
6636         return sorted(
6637             super(Set, self)._values_for_encoding(),
6638             key=attrgetter("tag_order"),
6639         )
6640
6641     def _encode_cer(self, writer):
6642         write_full(writer, self.tag + LENINDEF)
6643         for v in sorted(
6644                 super(Set, self)._values_for_encoding(),
6645                 key=attrgetter("tag_order_cer"),
6646         ):
6647             v.encode_cer(writer)
6648         write_full(writer, EOC)
6649
6650     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6651         try:
6652             t, tlen, lv = tag_strip(tlv)
6653         except DecodeError as err:
6654             raise err.__class__(
6655                 msg=err.msg,
6656                 klass=self.__class__,
6657                 decode_path=decode_path,
6658                 offset=offset,
6659             )
6660         if t != self.tag:
6661             raise TagMismatch(
6662                 klass=self.__class__,
6663                 decode_path=decode_path,
6664                 offset=offset,
6665             )
6666         if tag_only:
6667             yield None
6668             return
6669         lenindef = False
6670         ctx_bered = ctx.get("bered", False)
6671         try:
6672             l, llen, v = len_decode(lv)
6673         except LenIndefForm as err:
6674             if not ctx_bered:
6675                 raise err.__class__(
6676                     msg=err.msg,
6677                     klass=self.__class__,
6678                     decode_path=decode_path,
6679                     offset=offset,
6680                 )
6681             l, llen, v = 0, 1, lv[1:]
6682             lenindef = True
6683         except DecodeError as err:
6684             raise err.__class__(
6685                 msg=err.msg,
6686                 klass=self.__class__,
6687                 decode_path=decode_path,
6688                 offset=offset,
6689             )
6690         if l > len(v):
6691             raise NotEnoughData(
6692                 "encoded length is longer than data",
6693                 klass=self.__class__,
6694                 offset=offset,
6695             )
6696         if not lenindef:
6697             v, tail = v[:l], v[l:]
6698         vlen = 0
6699         sub_offset = offset + tlen + llen
6700         values = {}
6701         ber_encoded = False
6702         ctx_allow_default_values = ctx.get("allow_default_values", False)
6703         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6704         tag_order_prev = (0, 0)
6705         _specs_items = copy(self.specs)
6706
6707         while len(v) > 0:
6708             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6709                 break
6710             for name, spec in iteritems(_specs_items):
6711                 sub_decode_path = decode_path + (name,)
6712                 try:
6713                     spec.decode(
6714                         v,
6715                         sub_offset,
6716                         leavemm=True,
6717                         decode_path=sub_decode_path,
6718                         ctx=ctx,
6719                         tag_only=True,
6720                         _ctx_immutable=False,
6721                     )
6722                 except TagMismatch:
6723                     continue
6724                 break
6725             else:
6726                 raise TagMismatch(
6727                     klass=self.__class__,
6728                     decode_path=decode_path,
6729                     offset=offset,
6730                 )
6731             spec_defaulted = spec.default is not None
6732             if evgen_mode and not spec_defaulted:
6733                 for _decode_path, value, v_tail in spec.decode_evgen(
6734                         v,
6735                         sub_offset,
6736                         leavemm=True,
6737                         decode_path=sub_decode_path,
6738                         ctx=ctx,
6739                         _ctx_immutable=False,
6740                 ):
6741                     yield _decode_path, value, v_tail
6742             else:
6743                 _, value, v_tail = next(spec.decode_evgen(
6744                     v,
6745                     sub_offset,
6746                     leavemm=True,
6747                     decode_path=sub_decode_path,
6748                     ctx=ctx,
6749                     _ctx_immutable=False,
6750                     _evgen_mode=False,
6751                 ))
6752             value_tag_order = value.tag_order
6753             value_len = value.fulllen
6754             if tag_order_prev >= value_tag_order:
6755                 if ctx_bered or ctx_allow_unordered_set:
6756                     ber_encoded = True
6757                 else:
6758                     raise DecodeError(
6759                         "unordered " + self.asn1_type_name,
6760                         klass=self.__class__,
6761                         decode_path=sub_decode_path,
6762                         offset=sub_offset,
6763                     )
6764             if spec_defaulted:
6765                 if evgen_mode:
6766                     yield sub_decode_path, value, v_tail
6767                 if value != spec.default:
6768                     pass
6769                 elif ctx_bered or ctx_allow_default_values:
6770                     ber_encoded = True
6771                 else:
6772                     raise DecodeError(
6773                         "DEFAULT value met",
6774                         klass=self.__class__,
6775                         decode_path=sub_decode_path,
6776                         offset=sub_offset,
6777                     )
6778             values[name] = value
6779             del _specs_items[name]
6780             tag_order_prev = value_tag_order
6781             sub_offset += value_len
6782             vlen += value_len
6783             v = v_tail
6784
6785         obj = self.__class__(
6786             schema=self.specs,
6787             impl=self.tag,
6788             expl=self._expl,
6789             default=self.default,
6790             optional=self.optional,
6791             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6792         )
6793         if lenindef:
6794             if v[:EOC_LEN].tobytes() != EOC:
6795                 raise DecodeError(
6796                     "no EOC",
6797                     klass=self.__class__,
6798                     decode_path=decode_path,
6799                     offset=offset,
6800                 )
6801             tail = v[EOC_LEN:]
6802             obj.lenindef = True
6803         for name, spec in iteritems(self.specs):
6804             if name not in values and not spec.optional:
6805                 raise DecodeError(
6806                     "%s value is not ready" % name,
6807                     klass=self.__class__,
6808                     decode_path=decode_path,
6809                     offset=offset,
6810                 )
6811         if not evgen_mode:
6812             obj._value = values
6813         obj.ber_encoded = ber_encoded
6814         yield decode_path, obj, tail
6815
6816
6817 SequenceOfState = namedtuple(
6818     "SequenceOfState",
6819     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6820     **NAMEDTUPLE_KWARGS
6821 )
6822
6823
6824 class SequenceOf(SequenceEncode1stMixing, Obj):
6825     """``SEQUENCE OF`` sequence type
6826
6827     For that kind of type you must specify the object it will carry on
6828     (bounds are for example here, not required)::
6829
6830         class Ints(SequenceOf):
6831             schema = Integer()
6832             bounds = (0, 2)
6833
6834     >>> ints = Ints()
6835     >>> ints.append(Integer(123))
6836     >>> ints.append(Integer(234))
6837     >>> ints
6838     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6839     >>> [int(i) for i in ints]
6840     [123, 234]
6841     >>> ints.append(Integer(345))
6842     Traceback (most recent call last):
6843     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6844     >>> ints[1]
6845     INTEGER 234
6846     >>> ints[1] = Integer(345)
6847     >>> ints
6848     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6849
6850     You can initialize sequence with preinitialized values:
6851
6852     >>> ints = Ints([Integer(123), Integer(234)])
6853
6854     Also you can use iterator as a value:
6855
6856     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6857
6858     And it won't be iterated until encoding process. Pay attention that
6859     bounds and required schema checks are done only during the encoding
6860     process in that case! After encode was called, then value is zeroed
6861     back to empty list and you have to set it again. That mode is useful
6862     mainly with CER encoding mode, where all objects from the iterable
6863     will be streamed to the buffer, without copying all of them to
6864     memory first.
6865     """
6866     __slots__ = ("spec", "_bound_min", "_bound_max")
6867     tag_default = tag_encode(form=TagFormConstructed, num=16)
6868     asn1_type_name = "SEQUENCE OF"
6869
6870     def __init__(
6871             self,
6872             value=None,
6873             schema=None,
6874             bounds=None,
6875             impl=None,
6876             expl=None,
6877             default=None,
6878             optional=False,
6879             _decoded=(0, 0, 0),
6880     ):
6881         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6882         if schema is None:
6883             schema = getattr(self, "schema", None)
6884         if schema is None:
6885             raise ValueError("schema must be specified")
6886         self.spec = schema
6887         self._bound_min, self._bound_max = getattr(
6888             self,
6889             "bounds",
6890             (0, float("+inf")),
6891         ) if bounds is None else bounds
6892         self._value = []
6893         if value is not None:
6894             self._value = self._value_sanitize(value)
6895         if default is not None:
6896             default_value = self._value_sanitize(default)
6897             default_obj = self.__class__(
6898                 schema=schema,
6899                 impl=self.tag,
6900                 expl=self._expl,
6901             )
6902             default_obj._value = default_value
6903             self.default = default_obj
6904             if value is None:
6905                 self._value = copy(default_obj._value)
6906
6907     def _value_sanitize(self, value):
6908         iterator = False
6909         if issubclass(value.__class__, SequenceOf):
6910             value = value._value
6911         elif hasattr(value, NEXT_ATTR_NAME):
6912             iterator = True
6913         elif hasattr(value, "__iter__"):
6914             value = list(value)
6915         else:
6916             raise InvalidValueType((self.__class__, iter, "iterator"))
6917         if not iterator:
6918             if not self._bound_min <= len(value) <= self._bound_max:
6919                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6920             class_expected = self.spec.__class__
6921             for v in value:
6922                 if not isinstance(v, class_expected):
6923                     raise InvalidValueType((class_expected,))
6924         return value
6925
6926     @property
6927     def ready(self):
6928         if hasattr(self._value, NEXT_ATTR_NAME):
6929             return True
6930         if self._bound_min > 0 and len(self._value) == 0:
6931             return False
6932         return all(v.ready for v in self._value)
6933
6934     @property
6935     def bered(self):
6936         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6937             return True
6938         return any(v.bered for v in self._value)
6939
6940     def __getstate__(self):
6941         if hasattr(self._value, NEXT_ATTR_NAME):
6942             raise ValueError("can not pickle SequenceOf with iterator")
6943         return SequenceOfState(
6944             __version__,
6945             self.tag,
6946             self._tag_order,
6947             self._expl,
6948             self.default,
6949             self.optional,
6950             self.offset,
6951             self.llen,
6952             self.vlen,
6953             self.expl_lenindef,
6954             self.lenindef,
6955             self.ber_encoded,
6956             self.spec,
6957             [copy(v) for v in self._value],
6958             self._bound_min,
6959             self._bound_max,
6960         )
6961
6962     def __setstate__(self, state):
6963         super(SequenceOf, self).__setstate__(state)
6964         self.spec = state.spec
6965         self._value = state.value
6966         self._bound_min = state.bound_min
6967         self._bound_max = state.bound_max
6968
6969     def __eq__(self, their):
6970         if isinstance(their, self.__class__):
6971             return (
6972                 self.spec == their.spec and
6973                 self.tag == their.tag and
6974                 self._expl == their._expl and
6975                 self._value == their._value
6976             )
6977         if hasattr(their, "__iter__"):
6978             return self._value == list(their)
6979         return False
6980
6981     def __call__(
6982             self,
6983             value=None,
6984             bounds=None,
6985             impl=None,
6986             expl=None,
6987             default=None,
6988             optional=None,
6989     ):
6990         return self.__class__(
6991             value=value,
6992             schema=self.spec,
6993             bounds=(
6994                 (self._bound_min, self._bound_max)
6995                 if bounds is None else bounds
6996             ),
6997             impl=self.tag if impl is None else impl,
6998             expl=self._expl if expl is None else expl,
6999             default=self.default if default is None else default,
7000             optional=self.optional if optional is None else optional,
7001         )
7002
7003     def __contains__(self, key):
7004         return key in self._value
7005
7006     def append(self, value):
7007         if not isinstance(value, self.spec.__class__):
7008             raise InvalidValueType((self.spec.__class__,))
7009         if len(self._value) + 1 > self._bound_max:
7010             raise BoundsError(
7011                 self._bound_min,
7012                 len(self._value) + 1,
7013                 self._bound_max,
7014             )
7015         self._value.append(value)
7016
7017     def __iter__(self):
7018         return iter(self._value)
7019
7020     def __len__(self):
7021         return len(self._value)
7022
7023     def __setitem__(self, key, value):
7024         if not isinstance(value, self.spec.__class__):
7025             raise InvalidValueType((self.spec.__class__,))
7026         self._value[key] = self.spec(value=value)
7027
7028     def __getitem__(self, key):
7029         return self._value[key]
7030
7031     def _values_for_encoding(self):
7032         return iter(self._value)
7033
7034     def _encode(self):
7035         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7036         if iterator:
7037             values = []
7038             values_append = values.append
7039             class_expected = self.spec.__class__
7040             values_for_encoding = self._values_for_encoding()
7041             self._value = []
7042             for v in values_for_encoding:
7043                 if not isinstance(v, class_expected):
7044                     raise InvalidValueType((class_expected,))
7045                 values_append(v.encode())
7046             if not self._bound_min <= len(values) <= self._bound_max:
7047                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7048             value = b"".join(values)
7049         else:
7050             value = b"".join(v.encode() for v in self._values_for_encoding())
7051         return b"".join((self.tag, len_encode(len(value)), value))
7052
7053     def _encode1st(self, state):
7054         state = super(SequenceOf, self)._encode1st(state)
7055         if hasattr(self._value, NEXT_ATTR_NAME):
7056             self._value = []
7057         return state
7058
7059     def _encode2nd(self, writer, state_iter):
7060         write_full(writer, self.tag + len_encode(next(state_iter)))
7061         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7062         if iterator:
7063             values_count = 0
7064             class_expected = self.spec.__class__
7065             values_for_encoding = self._values_for_encoding()
7066             self._value = []
7067             for v in values_for_encoding:
7068                 if not isinstance(v, class_expected):
7069                     raise InvalidValueType((class_expected,))
7070                 v.encode2nd(writer, state_iter)
7071                 values_count += 1
7072             if not self._bound_min <= values_count <= self._bound_max:
7073                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7074         else:
7075             for v in self._values_for_encoding():
7076                 v.encode2nd(writer, state_iter)
7077
7078     def _encode_cer(self, writer):
7079         write_full(writer, self.tag + LENINDEF)
7080         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7081         if iterator:
7082             class_expected = self.spec.__class__
7083             values_count = 0
7084             values_for_encoding = self._values_for_encoding()
7085             self._value = []
7086             for v in values_for_encoding:
7087                 if not isinstance(v, class_expected):
7088                     raise InvalidValueType((class_expected,))
7089                 v.encode_cer(writer)
7090                 values_count += 1
7091             if not self._bound_min <= values_count <= self._bound_max:
7092                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7093         else:
7094             for v in self._values_for_encoding():
7095                 v.encode_cer(writer)
7096         write_full(writer, EOC)
7097
7098     def _decode(
7099             self,
7100             tlv,
7101             offset,
7102             decode_path,
7103             ctx,
7104             tag_only,
7105             evgen_mode,
7106             ordering_check=False,
7107     ):
7108         try:
7109             t, tlen, lv = tag_strip(tlv)
7110         except DecodeError as err:
7111             raise err.__class__(
7112                 msg=err.msg,
7113                 klass=self.__class__,
7114                 decode_path=decode_path,
7115                 offset=offset,
7116             )
7117         if t != self.tag:
7118             raise TagMismatch(
7119                 klass=self.__class__,
7120                 decode_path=decode_path,
7121                 offset=offset,
7122             )
7123         if tag_only:
7124             yield None
7125             return
7126         lenindef = False
7127         ctx_bered = ctx.get("bered", False)
7128         try:
7129             l, llen, v = len_decode(lv)
7130         except LenIndefForm as err:
7131             if not ctx_bered:
7132                 raise err.__class__(
7133                     msg=err.msg,
7134                     klass=self.__class__,
7135                     decode_path=decode_path,
7136                     offset=offset,
7137                 )
7138             l, llen, v = 0, 1, lv[1:]
7139             lenindef = True
7140         except DecodeError as err:
7141             raise err.__class__(
7142                 msg=err.msg,
7143                 klass=self.__class__,
7144                 decode_path=decode_path,
7145                 offset=offset,
7146             )
7147         if l > len(v):
7148             raise NotEnoughData(
7149                 "encoded length is longer than data",
7150                 klass=self.__class__,
7151                 decode_path=decode_path,
7152                 offset=offset,
7153             )
7154         if not lenindef:
7155             v, tail = v[:l], v[l:]
7156         vlen = 0
7157         sub_offset = offset + tlen + llen
7158         _value = []
7159         _value_count = 0
7160         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7161         value_prev = memoryview(v[:0])
7162         ber_encoded = False
7163         spec = self.spec
7164         while len(v) > 0:
7165             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7166                 break
7167             sub_decode_path = decode_path + (str(_value_count),)
7168             if evgen_mode:
7169                 for _decode_path, value, v_tail in spec.decode_evgen(
7170                         v,
7171                         sub_offset,
7172                         leavemm=True,
7173                         decode_path=sub_decode_path,
7174                         ctx=ctx,
7175                         _ctx_immutable=False,
7176                 ):
7177                     yield _decode_path, value, v_tail
7178             else:
7179                 _, value, v_tail = next(spec.decode_evgen(
7180                     v,
7181                     sub_offset,
7182                     leavemm=True,
7183                     decode_path=sub_decode_path,
7184                     ctx=ctx,
7185                     _ctx_immutable=False,
7186                     _evgen_mode=False,
7187                 ))
7188             value_len = value.fulllen
7189             if ordering_check:
7190                 if value_prev.tobytes() > v[:value_len].tobytes():
7191                     if ctx_bered or ctx_allow_unordered_set:
7192                         ber_encoded = True
7193                     else:
7194                         raise DecodeError(
7195                             "unordered " + self.asn1_type_name,
7196                             klass=self.__class__,
7197                             decode_path=sub_decode_path,
7198                             offset=sub_offset,
7199                         )
7200                 value_prev = v[:value_len]
7201             _value_count += 1
7202             if not evgen_mode:
7203                 _value.append(value)
7204             sub_offset += value_len
7205             vlen += value_len
7206             v = v_tail
7207         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7208             raise DecodeError(
7209                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7210                 klass=self.__class__,
7211                 decode_path=decode_path,
7212                 offset=offset,
7213             )
7214         try:
7215             obj = self.__class__(
7216                 value=None if evgen_mode else _value,
7217                 schema=spec,
7218                 bounds=(self._bound_min, self._bound_max),
7219                 impl=self.tag,
7220                 expl=self._expl,
7221                 default=self.default,
7222                 optional=self.optional,
7223                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7224             )
7225         except BoundsError as err:
7226             raise DecodeError(
7227                 msg=str(err),
7228                 klass=self.__class__,
7229                 decode_path=decode_path,
7230                 offset=offset,
7231             )
7232         if lenindef:
7233             if v[:EOC_LEN].tobytes() != EOC:
7234                 raise DecodeError(
7235                     "no EOC",
7236                     klass=self.__class__,
7237                     decode_path=decode_path,
7238                     offset=offset,
7239                 )
7240             obj.lenindef = True
7241             tail = v[EOC_LEN:]
7242         obj.ber_encoded = ber_encoded
7243         yield decode_path, obj, tail
7244
7245     def __repr__(self):
7246         return "%s[%s]" % (
7247             pp_console_row(next(self.pps())),
7248             ", ".join(repr(v) for v in self._value),
7249         )
7250
7251     def pps(self, decode_path=()):
7252         yield _pp(
7253             obj=self,
7254             asn1_type_name=self.asn1_type_name,
7255             obj_name=self.__class__.__name__,
7256             decode_path=decode_path,
7257             optional=self.optional,
7258             default=self == self.default,
7259             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7260             expl=None if self._expl is None else tag_decode(self._expl),
7261             offset=self.offset,
7262             tlen=self.tlen,
7263             llen=self.llen,
7264             vlen=self.vlen,
7265             expl_offset=self.expl_offset if self.expled else None,
7266             expl_tlen=self.expl_tlen if self.expled else None,
7267             expl_llen=self.expl_llen if self.expled else None,
7268             expl_vlen=self.expl_vlen if self.expled else None,
7269             expl_lenindef=self.expl_lenindef,
7270             lenindef=self.lenindef,
7271             ber_encoded=self.ber_encoded,
7272             bered=self.bered,
7273         )
7274         for i, value in enumerate(self._value):
7275             yield value.pps(decode_path=decode_path + (str(i),))
7276         for pp in self.pps_lenindef(decode_path):
7277             yield pp
7278
7279
7280 class SetOf(SequenceOf):
7281     """``SET OF`` sequence type
7282
7283     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7284     """
7285     __slots__ = ()
7286     tag_default = tag_encode(form=TagFormConstructed, num=17)
7287     asn1_type_name = "SET OF"
7288
7289     def _value_sanitize(self, value):
7290         value = super(SetOf, self)._value_sanitize(value)
7291         if hasattr(value, NEXT_ATTR_NAME):
7292             raise ValueError(
7293                 "SetOf does not support iterator values, as no sense in them"
7294             )
7295         return value
7296
7297     def _encode(self):
7298         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7299         return b"".join((self.tag, len_encode(len(v)), v))
7300
7301     def _encode2nd(self, writer, state_iter):
7302         write_full(writer, self.tag + len_encode(next(state_iter)))
7303         values = []
7304         for v in self._values_for_encoding():
7305             buf = BytesIO()
7306             v.encode2nd(buf.write, state_iter)
7307             values.append(buf.getvalue())
7308         values.sort()
7309         for v in values:
7310             write_full(writer, v)
7311
7312     def _encode_cer(self, writer):
7313         write_full(writer, self.tag + LENINDEF)
7314         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7315             write_full(writer, v)
7316         write_full(writer, EOC)
7317
7318     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7319         return super(SetOf, self)._decode(
7320             tlv,
7321             offset,
7322             decode_path,
7323             ctx,
7324             tag_only,
7325             evgen_mode,
7326             ordering_check=True,
7327         )
7328
7329
7330 def obj_by_path(pypath):  # pragma: no cover
7331     """Import object specified as string Python path
7332
7333     Modules must be separated from classes/functions with ``:``.
7334
7335     >>> obj_by_path("foo.bar:Baz")
7336     <class 'foo.bar.Baz'>
7337     >>> obj_by_path("foo.bar:Baz.boo")
7338     <classmethod 'foo.bar.Baz.boo'>
7339     """
7340     mod, objs = pypath.rsplit(":", 1)
7341     from importlib import import_module
7342     obj = import_module(mod)
7343     for obj_name in objs.split("."):
7344         obj = getattr(obj, obj_name)
7345     return obj
7346
7347
7348 def generic_decoder():  # pragma: no cover
7349     # All of this below is a big hack with self references
7350     choice = PrimitiveTypes()
7351     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7352     choice.specs["SetOf"] = SetOf(schema=choice)
7353     for i in six_xrange(31):
7354         choice.specs["SequenceOf%d" % i] = SequenceOf(
7355             schema=choice,
7356             expl=tag_ctxc(i),
7357         )
7358     choice.specs["Any"] = Any()
7359
7360     # Class name equals to type name, to omit it from output
7361     class SEQUENCEOF(SequenceOf):
7362         __slots__ = ()
7363         schema = choice
7364
7365     def pprint_any(
7366             obj,
7367             oid_maps=(),
7368             with_colours=False,
7369             with_decode_path=False,
7370             decode_path_only=(),
7371             decode_path=(),
7372     ):
7373         def _pprint_pps(pps):
7374             for pp in pps:
7375                 if hasattr(pp, "_fields"):
7376                     if (
7377                             decode_path_only != () and
7378                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7379                     ):
7380                         continue
7381                     if pp.asn1_type_name == Choice.asn1_type_name:
7382                         continue
7383                     pp_kwargs = pp._asdict()
7384                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7385                     pp = _pp(**pp_kwargs)
7386                     yield pp_console_row(
7387                         pp,
7388                         oid_maps=oid_maps,
7389                         with_offsets=True,
7390                         with_blob=False,
7391                         with_colours=with_colours,
7392                         with_decode_path=with_decode_path,
7393                         decode_path_len_decrease=len(decode_path_only),
7394                     )
7395                     for row in pp_console_blob(
7396                             pp,
7397                             decode_path_len_decrease=len(decode_path_only),
7398                     ):
7399                         yield row
7400                 else:
7401                     for row in _pprint_pps(pp):
7402                         yield row
7403         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7404     return SEQUENCEOF(), pprint_any
7405
7406
7407 def main():  # pragma: no cover
7408     import argparse
7409     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7410     parser.add_argument(
7411         "--skip",
7412         type=int,
7413         default=0,
7414         help="Skip that number of bytes from the beginning",
7415     )
7416     parser.add_argument(
7417         "--oids",
7418         help="Python paths to dictionary with OIDs, comma separated",
7419     )
7420     parser.add_argument(
7421         "--schema",
7422         help="Python path to schema definition to use",
7423     )
7424     parser.add_argument(
7425         "--defines-by-path",
7426         help="Python path to decoder's defines_by_path",
7427     )
7428     parser.add_argument(
7429         "--nobered",
7430         action="store_true",
7431         help="Disallow BER encoding",
7432     )
7433     parser.add_argument(
7434         "--print-decode-path",
7435         action="store_true",
7436         help="Print decode paths",
7437     )
7438     parser.add_argument(
7439         "--decode-path-only",
7440         help="Print only specified decode path",
7441     )
7442     parser.add_argument(
7443         "--allow-expl-oob",
7444         action="store_true",
7445         help="Allow explicit tag out-of-bound",
7446     )
7447     parser.add_argument(
7448         "--evgen",
7449         action="store_true",
7450         help="Turn on event generation mode",
7451     )
7452     parser.add_argument(
7453         "RAWFile",
7454         type=argparse.FileType("rb"),
7455         help="Path to BER/CER/DER file you want to decode",
7456     )
7457     args = parser.parse_args()
7458     if PY2:
7459         args.RAWFile.seek(args.skip)
7460         raw = memoryview(args.RAWFile.read())
7461         args.RAWFile.close()
7462     else:
7463         raw = file_mmaped(args.RAWFile)[args.skip:]
7464     oid_maps = (
7465         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7466         if args.oids else ()
7467     )
7468     from functools import partial
7469     if args.schema:
7470         schema = obj_by_path(args.schema)
7471         pprinter = partial(pprint, big_blobs=True)
7472     else:
7473         schema, pprinter = generic_decoder()
7474     ctx = {
7475         "bered": not args.nobered,
7476         "allow_expl_oob": args.allow_expl_oob,
7477     }
7478     if args.defines_by_path is not None:
7479         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7480     from os import environ
7481     pprinter = partial(
7482         pprinter,
7483         oid_maps=oid_maps,
7484         with_colours=environ.get("NO_COLOR") is None,
7485         with_decode_path=args.print_decode_path,
7486         decode_path_only=(
7487             () if args.decode_path_only is None else
7488             tuple(args.decode_path_only.split(":"))
7489         ),
7490     )
7491     if args.evgen:
7492         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7493             print(pprinter(obj, decode_path=decode_path))
7494     else:
7495         obj, tail = schema().decode(raw, ctx=ctx)
7496         print(pprinter(obj))
7497     if tail != b"":
7498         print("\nTrailing data: %s" % hexenc(tail))
7499
7500
7501 if __name__ == "__main__":
7502     from pyderasn import *
7503     main()