]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
54f2027f914df7db6d147dacde4d3daebf519a9e
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 .. _browser:
905
906 ASN.1 browser
907 -------------
908 .. autofunction:: pyderasn.browse
909
910 Base Obj
911 --------
912 .. autoclass:: pyderasn.Obj
913    :members:
914
915 Primitive types
916 ---------------
917
918 Boolean
919 _______
920 .. autoclass:: pyderasn.Boolean
921    :members: __init__
922
923 Integer
924 _______
925 .. autoclass:: pyderasn.Integer
926    :members: __init__, named, tohex
927
928 BitString
929 _________
930 .. autoclass:: pyderasn.BitString
931    :members: __init__, bit_len, named
932
933 OctetString
934 ___________
935 .. autoclass:: pyderasn.OctetString
936    :members: __init__
937
938 Null
939 ____
940 .. autoclass:: pyderasn.Null
941    :members: __init__
942
943 ObjectIdentifier
944 ________________
945 .. autoclass:: pyderasn.ObjectIdentifier
946    :members: __init__
947
948 Enumerated
949 __________
950 .. autoclass:: pyderasn.Enumerated
951
952 CommonString
953 ____________
954 .. autoclass:: pyderasn.CommonString
955
956 NumericString
957 _____________
958 .. autoclass:: pyderasn.NumericString
959
960 PrintableString
961 _______________
962 .. autoclass:: pyderasn.PrintableString
963    :members: __init__, allow_asterisk, allow_ampersand
964
965 UTCTime
966 _______
967 .. autoclass:: pyderasn.UTCTime
968    :members: __init__, todatetime
969
970 GeneralizedTime
971 _______________
972 .. autoclass:: pyderasn.GeneralizedTime
973    :members: __init__, todatetime
974
975 Special types
976 -------------
977
978 Choice
979 ______
980 .. autoclass:: pyderasn.Choice
981    :members: __init__, choice, value
982
983 PrimitiveTypes
984 ______________
985 .. autoclass:: PrimitiveTypes
986
987 Any
988 ___
989 .. autoclass:: pyderasn.Any
990    :members: __init__
991
992 Constructed types
993 -----------------
994
995 Sequence
996 ________
997 .. autoclass:: pyderasn.Sequence
998    :members: __init__
999
1000 Set
1001 ___
1002 .. autoclass:: pyderasn.Set
1003    :members: __init__
1004
1005 SequenceOf
1006 __________
1007 .. autoclass:: pyderasn.SequenceOf
1008    :members: __init__
1009
1010 SetOf
1011 _____
1012 .. autoclass:: pyderasn.SetOf
1013    :members: __init__
1014
1015 Various
1016 -------
1017
1018 .. autofunction:: pyderasn.abs_decode_path
1019 .. autofunction:: pyderasn.agg_octet_string
1020 .. autofunction:: pyderasn.ascii_visualize
1021 .. autofunction:: pyderasn.colonize_hex
1022 .. autofunction:: pyderasn.encode2pass
1023 .. autofunction:: pyderasn.encode_cer
1024 .. autofunction:: pyderasn.file_mmaped
1025 .. autofunction:: pyderasn.hexenc
1026 .. autofunction:: pyderasn.hexdec
1027 .. autofunction:: pyderasn.hexdump
1028 .. autofunction:: pyderasn.tag_encode
1029 .. autofunction:: pyderasn.tag_decode
1030 .. autofunction:: pyderasn.tag_ctxp
1031 .. autofunction:: pyderasn.tag_ctxc
1032 .. autoclass:: pyderasn.DecodeError
1033    :members: __init__
1034 .. autoclass:: pyderasn.NotEnoughData
1035 .. autoclass:: pyderasn.ExceedingData
1036 .. autoclass:: pyderasn.LenIndefForm
1037 .. autoclass:: pyderasn.TagMismatch
1038 .. autoclass:: pyderasn.InvalidLength
1039 .. autoclass:: pyderasn.InvalidOID
1040 .. autoclass:: pyderasn.ObjUnknown
1041 .. autoclass:: pyderasn.ObjNotReady
1042 .. autoclass:: pyderasn.InvalidValueType
1043 .. autoclass:: pyderasn.BoundsError
1044
1045 .. _cmdline:
1046
1047 Command-line usage
1048 ------------------
1049
1050 You can decode DER/BER files using command line abilities::
1051
1052     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1053
1054 If there is no schema for your file, then you can try parsing it without,
1055 but of course IMPLICIT tags will often make it impossible. But result is
1056 good enough for the certificate above::
1057
1058     $ python -m pyderasn path/to/file
1059         0   [1,3,1604]  . >: SEQUENCE OF
1060         4   [1,3,1453]  . . >: SEQUENCE OF
1061         8   [0,0,   5]  . . . . >: [0] ANY
1062                         . . . . . A0:03:02:01:02
1063        13   [1,1,   3]  . . . . >: INTEGER 61595
1064        18   [1,1,  13]  . . . . >: SEQUENCE OF
1065        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1066        31   [1,1,   0]  . . . . . . >: NULL
1067        33   [1,3, 274]  . . . . >: SEQUENCE OF
1068        37   [1,1,  11]  . . . . . . >: SET OF
1069        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1070        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1071        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1072     [...]
1073      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1074      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1075      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1076                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1077                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1078                         . . . . . . . . . 61:2E:63:6F:6D:2F
1079      1461   [1,1,  13]  . . >: SEQUENCE OF
1080      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1081      1474   [1,1,   0]  . . . . >: NULL
1082      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1083                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1084                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1085     [...]
1086
1087 Human readable OIDs
1088 ___________________
1089
1090 If you have got dictionaries with ObjectIdentifiers, like example one
1091 from ``tests/test_crts.py``::
1092
1093     stroid2name = {
1094         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1095         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1096         [...]
1097         "2.5.4.10": "id-at-organizationName",
1098         "2.5.4.11": "id-at-organizationalUnitName",
1099     }
1100
1101 then you can pass it to pretty printer to see human readable OIDs::
1102
1103     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1104     [...]
1105        37   [1,1,  11]  . . . . . . >: SET OF
1106        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1107        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1108        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1109        50   [1,1,  18]  . . . . . . >: SET OF
1110        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1111        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1112        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1113        70   [1,1,  18]  . . . . . . >: SET OF
1114        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1115        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1116        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1117     [...]
1118
1119 Decode paths
1120 ____________
1121
1122 Each decoded element has so-called decode path: sequence of structure
1123 names it is passing during the decode process. Each element has its own
1124 unique path inside the whole ASN.1 tree. You can print it out with
1125 ``--print-decode-path`` option::
1126
1127     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1128        0    [1,3,1604]  Certificate SEQUENCE []
1129        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1130       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1131       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1132       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1133       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1134       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1135                          . . . . 05:00
1136       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1137       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1138       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1139       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1140       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1141       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1142                          . . . . . . . 13:02:45:53
1143       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1144     [...]
1145
1146 Now you can print only the specified tree, for example signature algorithm::
1147
1148     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1149       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1150       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1151       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1152                          . . 05:00
1153 """
1154
1155 from array import array
1156 from codecs import getdecoder
1157 from codecs import getencoder
1158 from collections import namedtuple
1159 from collections import OrderedDict
1160 from copy import copy
1161 from datetime import datetime
1162 from datetime import timedelta
1163 from io import BytesIO
1164 from math import ceil
1165 from mmap import mmap
1166 from mmap import PROT_READ
1167 from operator import attrgetter
1168 from string import ascii_letters
1169 from string import digits
1170 from sys import maxsize as sys_maxsize
1171 from sys import version_info
1172 from unicodedata import category as unicat
1173
1174 from six import add_metaclass
1175 from six import binary_type
1176 from six import byte2int
1177 from six import indexbytes
1178 from six import int2byte
1179 from six import integer_types
1180 from six import iterbytes
1181 from six import iteritems
1182 from six import itervalues
1183 from six import PY2
1184 from six import string_types
1185 from six import text_type
1186 from six import unichr as six_unichr
1187 from six.moves import xrange as six_xrange
1188
1189
1190 try:
1191     from termcolor import colored
1192 except ImportError:  # pragma: no cover
1193     def colored(what, *args, **kwargs):
1194         return what
1195
1196 __version__ = "7.5"
1197
1198 __all__ = (
1199     "agg_octet_string",
1200     "Any",
1201     "BitString",
1202     "BMPString",
1203     "Boolean",
1204     "BoundsError",
1205     "Choice",
1206     "colonize_hex",
1207     "DecodeError",
1208     "DecodePathDefBy",
1209     "encode2pass",
1210     "encode_cer",
1211     "Enumerated",
1212     "ExceedingData",
1213     "file_mmaped",
1214     "GeneralizedTime",
1215     "GeneralString",
1216     "GraphicString",
1217     "hexdec",
1218     "hexenc",
1219     "IA5String",
1220     "Integer",
1221     "InvalidLength",
1222     "InvalidOID",
1223     "InvalidValueType",
1224     "ISO646String",
1225     "LenIndefForm",
1226     "NotEnoughData",
1227     "Null",
1228     "NumericString",
1229     "obj_by_path",
1230     "ObjectIdentifier",
1231     "ObjNotReady",
1232     "ObjUnknown",
1233     "OctetString",
1234     "PrimitiveTypes",
1235     "PrintableString",
1236     "Sequence",
1237     "SequenceOf",
1238     "Set",
1239     "SetOf",
1240     "T61String",
1241     "tag_ctxc",
1242     "tag_ctxp",
1243     "tag_decode",
1244     "TagClassApplication",
1245     "TagClassContext",
1246     "TagClassPrivate",
1247     "TagClassUniversal",
1248     "TagFormConstructed",
1249     "TagFormPrimitive",
1250     "TagMismatch",
1251     "TeletexString",
1252     "UniversalString",
1253     "UTCTime",
1254     "UTF8String",
1255     "VideotexString",
1256     "VisibleString",
1257 )
1258
1259 TagClassUniversal = 0
1260 TagClassApplication = 1 << 6
1261 TagClassContext = 1 << 7
1262 TagClassPrivate = 1 << 6 | 1 << 7
1263 TagFormPrimitive = 0
1264 TagFormConstructed = 1 << 5
1265 TagClassReprs = {
1266     TagClassContext: "",
1267     TagClassApplication: "APPLICATION ",
1268     TagClassPrivate: "PRIVATE ",
1269     TagClassUniversal: "UNIV ",
1270 }
1271 EOC = b"\x00\x00"
1272 EOC_LEN = len(EOC)
1273 LENINDEF = b"\x80"  # length indefinite mark
1274 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1275 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1276 SET01 = frozenset("01")
1277 DECIMALS = frozenset(digits)
1278 DECIMAL_SIGNS = ".,"
1279 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1280
1281
1282 def file_mmaped(fd):
1283     """Make mmap-ed memoryview for reading from file
1284
1285     :param fd: file object
1286     :returns: memoryview over read-only mmap-ing of the whole file
1287     """
1288     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1289
1290
1291 def pureint(value):
1292     if not set(value) <= DECIMALS:
1293         raise ValueError("non-pure integer")
1294     return int(value)
1295
1296
1297 def fractions2float(fractions_raw):
1298     pureint(fractions_raw)
1299     return float("0." + fractions_raw)
1300
1301
1302 def get_def_by_path(defines_by_path, sub_decode_path):
1303     """Get define by decode path
1304     """
1305     for path, define in defines_by_path:
1306         if len(path) != len(sub_decode_path):
1307             continue
1308         for p1, p2 in zip(path, sub_decode_path):
1309             if (p1 is not any) and (p1 != p2):
1310                 break
1311         else:
1312             return define
1313
1314
1315 ########################################################################
1316 # Errors
1317 ########################################################################
1318
1319 class ASN1Error(ValueError):
1320     pass
1321
1322
1323 class DecodeError(ASN1Error):
1324     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1325         """
1326         :param str msg: reason of decode failing
1327         :param klass: optional exact DecodeError inherited class (like
1328                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1329                       :py:exc:`InvalidLength`)
1330         :param decode_path: tuple of strings. It contains human
1331                             readable names of the fields through which
1332                             decoding process has passed
1333         :param int offset: binary offset where failure happened
1334         """
1335         super(DecodeError, self).__init__()
1336         self.msg = msg
1337         self.klass = klass
1338         self.decode_path = decode_path
1339         self.offset = offset
1340
1341     def __str__(self):
1342         return " ".join(
1343             c for c in (
1344                 "" if self.klass is None else self.klass.__name__,
1345                 (
1346                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1347                     if len(self.decode_path) > 0 else ""
1348                 ),
1349                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1350                 self.msg,
1351             ) if c != ""
1352         )
1353
1354     def __repr__(self):
1355         return "%s(%s)" % (self.__class__.__name__, self)
1356
1357
1358 class NotEnoughData(DecodeError):
1359     pass
1360
1361
1362 class ExceedingData(ASN1Error):
1363     def __init__(self, nbytes):
1364         super(ExceedingData, self).__init__()
1365         self.nbytes = nbytes
1366
1367     def __str__(self):
1368         return "%d trailing bytes" % self.nbytes
1369
1370     def __repr__(self):
1371         return "%s(%s)" % (self.__class__.__name__, self)
1372
1373
1374 class LenIndefForm(DecodeError):
1375     pass
1376
1377
1378 class TagMismatch(DecodeError):
1379     pass
1380
1381
1382 class InvalidLength(DecodeError):
1383     pass
1384
1385
1386 class InvalidOID(DecodeError):
1387     pass
1388
1389
1390 class ObjUnknown(ASN1Error):
1391     def __init__(self, name):
1392         super(ObjUnknown, self).__init__()
1393         self.name = name
1394
1395     def __str__(self):
1396         return "object is unknown: %s" % self.name
1397
1398     def __repr__(self):
1399         return "%s(%s)" % (self.__class__.__name__, self)
1400
1401
1402 class ObjNotReady(ASN1Error):
1403     def __init__(self, name):
1404         super(ObjNotReady, self).__init__()
1405         self.name = name
1406
1407     def __str__(self):
1408         return "object is not ready: %s" % self.name
1409
1410     def __repr__(self):
1411         return "%s(%s)" % (self.__class__.__name__, self)
1412
1413
1414 class InvalidValueType(ASN1Error):
1415     def __init__(self, expected_types):
1416         super(InvalidValueType, self).__init__()
1417         self.expected_types = expected_types
1418
1419     def __str__(self):
1420         return "invalid value type, expected: %s" % ", ".join(
1421             [repr(t) for t in self.expected_types]
1422         )
1423
1424     def __repr__(self):
1425         return "%s(%s)" % (self.__class__.__name__, self)
1426
1427
1428 class BoundsError(ASN1Error):
1429     def __init__(self, bound_min, value, bound_max):
1430         super(BoundsError, self).__init__()
1431         self.bound_min = bound_min
1432         self.value = value
1433         self.bound_max = bound_max
1434
1435     def __str__(self):
1436         return "unsatisfied bounds: %s <= %s <= %s" % (
1437             self.bound_min,
1438             self.value,
1439             self.bound_max,
1440         )
1441
1442     def __repr__(self):
1443         return "%s(%s)" % (self.__class__.__name__, self)
1444
1445
1446 ########################################################################
1447 # Basic coders
1448 ########################################################################
1449
1450 _hexdecoder = getdecoder("hex")
1451 _hexencoder = getencoder("hex")
1452
1453
1454 def hexdec(data):
1455     """Binary data to hexadecimal string convert
1456     """
1457     return _hexdecoder(data)[0]
1458
1459
1460 def hexenc(data):
1461     """Hexadecimal string to binary data convert
1462     """
1463     return _hexencoder(data)[0].decode("ascii")
1464
1465
1466 def int_bytes_len(num, byte_len=8):
1467     if num == 0:
1468         return 1
1469     return int(ceil(float(num.bit_length()) / byte_len))
1470
1471
1472 def zero_ended_encode(num):
1473     octets = bytearray(int_bytes_len(num, 7))
1474     i = len(octets) - 1
1475     octets[i] = num & 0x7F
1476     num >>= 7
1477     i -= 1
1478     while num > 0:
1479         octets[i] = 0x80 | (num & 0x7F)
1480         num >>= 7
1481         i -= 1
1482     return bytes(octets)
1483
1484
1485 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1486     """Encode tag to binary form
1487
1488     :param int num: tag's number
1489     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1490                       :py:data:`pyderasn.TagClassContext`,
1491                       :py:data:`pyderasn.TagClassApplication`,
1492                       :py:data:`pyderasn.TagClassPrivate`)
1493     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1494                      :py:data:`pyderasn.TagFormConstructed`)
1495     """
1496     if num < 31:
1497         # [XX|X|.....]
1498         return int2byte(klass | form | num)
1499     # [XX|X|11111][1.......][1.......] ... [0.......]
1500     return int2byte(klass | form | 31) + zero_ended_encode(num)
1501
1502
1503 def tag_decode(tag):
1504     """Decode tag from binary form
1505
1506     .. warning::
1507
1508        No validation is performed, assuming that it has already passed.
1509
1510     It returns tuple with three integers, as
1511     :py:func:`pyderasn.tag_encode` accepts.
1512     """
1513     first_octet = byte2int(tag)
1514     klass = first_octet & 0xC0
1515     form = first_octet & 0x20
1516     if first_octet & 0x1F < 0x1F:
1517         return (klass, form, first_octet & 0x1F)
1518     num = 0
1519     for octet in iterbytes(tag[1:]):
1520         num <<= 7
1521         num |= octet & 0x7F
1522     return (klass, form, num)
1523
1524
1525 def tag_ctxp(num):
1526     """Create CONTEXT PRIMITIVE tag
1527     """
1528     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1529
1530
1531 def tag_ctxc(num):
1532     """Create CONTEXT CONSTRUCTED tag
1533     """
1534     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1535
1536
1537 def tag_strip(data):
1538     """Take off tag from the data
1539
1540     :returns: (encoded tag, tag length, remaining data)
1541     """
1542     if len(data) == 0:
1543         raise NotEnoughData("no data at all")
1544     if byte2int(data) & 0x1F < 31:
1545         return data[:1], 1, data[1:]
1546     i = 0
1547     while True:
1548         i += 1
1549         if i == len(data):
1550             raise DecodeError("unfinished tag")
1551         if indexbytes(data, i) & 0x80 == 0:
1552             break
1553     i += 1
1554     return data[:i], i, data[i:]
1555
1556
1557 def len_encode(l):
1558     if l < 0x80:
1559         return int2byte(l)
1560     octets = bytearray(int_bytes_len(l) + 1)
1561     octets[0] = 0x80 | (len(octets) - 1)
1562     for i in six_xrange(len(octets) - 1, 0, -1):
1563         octets[i] = l & 0xFF
1564         l >>= 8
1565     return bytes(octets)
1566
1567
1568 def len_decode(data):
1569     """Decode length
1570
1571     :returns: (decoded length, length's length, remaining data)
1572     :raises LenIndefForm: if indefinite form encoding is met
1573     """
1574     if len(data) == 0:
1575         raise NotEnoughData("no data at all")
1576     first_octet = byte2int(data)
1577     if first_octet & 0x80 == 0:
1578         return first_octet, 1, data[1:]
1579     octets_num = first_octet & 0x7F
1580     if octets_num + 1 > len(data):
1581         raise NotEnoughData("encoded length is longer than data")
1582     if octets_num == 0:
1583         raise LenIndefForm()
1584     if byte2int(data[1:]) == 0:
1585         raise DecodeError("leading zeros")
1586     l = 0
1587     for v in iterbytes(data[1:1 + octets_num]):
1588         l = (l << 8) | v
1589     if l <= 127:
1590         raise DecodeError("long form instead of short one")
1591     return l, 1 + octets_num, data[1 + octets_num:]
1592
1593
1594 LEN0 = len_encode(0)
1595 LEN1 = len_encode(1)
1596 LEN1K = len_encode(1000)
1597
1598
1599 def len_size(l):
1600     """How many bytes length field will take
1601     """
1602     if l < 128:
1603         return 1
1604     if l < 256:  # 1 << 8
1605         return 2
1606     if l < 65536:  # 1 << 16
1607         return 3
1608     if l < 16777216:  # 1 << 24
1609         return 4
1610     if l < 4294967296:  # 1 << 32
1611         return 5
1612     if l < 1099511627776:  # 1 << 40
1613         return 6
1614     if l < 281474976710656:  # 1 << 48
1615         return 7
1616     if l < 72057594037927936:  # 1 << 56
1617         return 8
1618     raise OverflowError("too big length")
1619
1620
1621 def write_full(writer, data):
1622     """Fully write provided data
1623
1624     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1625
1626     BytesIO does not guarantee that the whole data will be written at
1627     once. That function write everything provided, raising an error if
1628     ``writer`` returns None.
1629     """
1630     data = memoryview(data)
1631     written = 0
1632     while written != len(data):
1633         n = writer(data[written:])
1634         if n is None:
1635             raise ValueError("can not write to buf")
1636         written += n
1637
1638
1639 # If it is 64-bit system, then use compact 64-bit array of unsigned
1640 # longs. Use an ordinary list with universal integers otherwise, that
1641 # is slower.
1642 if sys_maxsize > 2 ** 32:
1643     def state_2pass_new():
1644         return array("L")
1645 else:
1646     def state_2pass_new():
1647         return []
1648
1649
1650 ########################################################################
1651 # Base class
1652 ########################################################################
1653
1654 class AutoAddSlots(type):
1655     def __new__(cls, name, bases, _dict):
1656         _dict["__slots__"] = _dict.get("__slots__", ())
1657         return type.__new__(cls, name, bases, _dict)
1658
1659
1660 BasicState = namedtuple("BasicState", (
1661     "version",
1662     "tag",
1663     "tag_order",
1664     "expl",
1665     "default",
1666     "optional",
1667     "offset",
1668     "llen",
1669     "vlen",
1670     "expl_lenindef",
1671     "lenindef",
1672     "ber_encoded",
1673 ), **NAMEDTUPLE_KWARGS)
1674
1675
1676 @add_metaclass(AutoAddSlots)
1677 class Obj(object):
1678     """Common ASN.1 object class
1679
1680     All ASN.1 types are inherited from it. It has metaclass that
1681     automatically adds ``__slots__`` to all inherited classes.
1682     """
1683     __slots__ = (
1684         "tag",
1685         "_tag_order",
1686         "_value",
1687         "_expl",
1688         "default",
1689         "optional",
1690         "offset",
1691         "llen",
1692         "vlen",
1693         "expl_lenindef",
1694         "lenindef",
1695         "ber_encoded",
1696     )
1697
1698     def __init__(
1699             self,
1700             impl=None,
1701             expl=None,
1702             default=None,
1703             optional=False,
1704             _decoded=(0, 0, 0),
1705     ):
1706         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1707         self._expl = getattr(self, "expl", None) if expl is None else expl
1708         if self.tag != self.tag_default and self._expl is not None:
1709             raise ValueError("implicit and explicit tags can not be set simultaneously")
1710         if self.tag is None:
1711             self._tag_order = None
1712         else:
1713             tag_class, _, tag_num = tag_decode(
1714                 self.tag if self._expl is None else self._expl
1715             )
1716             self._tag_order = (tag_class, tag_num)
1717         if default is not None:
1718             optional = True
1719         self.optional = optional
1720         self.offset, self.llen, self.vlen = _decoded
1721         self.default = None
1722         self.expl_lenindef = False
1723         self.lenindef = False
1724         self.ber_encoded = False
1725
1726     @property
1727     def ready(self):  # pragma: no cover
1728         """Is object ready to be encoded?
1729         """
1730         raise NotImplementedError()
1731
1732     def _assert_ready(self):
1733         if not self.ready:
1734             raise ObjNotReady(self.__class__.__name__)
1735
1736     @property
1737     def bered(self):
1738         """Is either object or any elements inside is BER encoded?
1739         """
1740         return self.expl_lenindef or self.lenindef or self.ber_encoded
1741
1742     @property
1743     def decoded(self):
1744         """Is object decoded?
1745         """
1746         return (self.llen + self.vlen) > 0
1747
1748     def __getstate__(self):  # pragma: no cover
1749         """Used for making safe to be mutable pickleable copies
1750         """
1751         raise NotImplementedError()
1752
1753     def __setstate__(self, state):
1754         if state.version != __version__:
1755             raise ValueError("data is pickled by different PyDERASN version")
1756         self.tag = state.tag
1757         self._tag_order = state.tag_order
1758         self._expl = state.expl
1759         self.default = state.default
1760         self.optional = state.optional
1761         self.offset = state.offset
1762         self.llen = state.llen
1763         self.vlen = state.vlen
1764         self.expl_lenindef = state.expl_lenindef
1765         self.lenindef = state.lenindef
1766         self.ber_encoded = state.ber_encoded
1767
1768     @property
1769     def tag_order(self):
1770         """Tag's (class, number) used for DER/CER sorting
1771         """
1772         return self._tag_order
1773
1774     @property
1775     def tag_order_cer(self):
1776         return self.tag_order
1777
1778     @property
1779     def tlen(self):
1780         """.. seealso:: :ref:`decoding`
1781         """
1782         return len(self.tag)
1783
1784     @property
1785     def tlvlen(self):
1786         """.. seealso:: :ref:`decoding`
1787         """
1788         return self.tlen + self.llen + self.vlen
1789
1790     def __str__(self):  # pragma: no cover
1791         return self.__bytes__() if PY2 else self.__unicode__()
1792
1793     def __ne__(self, their):
1794         return not(self == their)
1795
1796     def __gt__(self, their):  # pragma: no cover
1797         return not(self < their)
1798
1799     def __le__(self, their):  # pragma: no cover
1800         return (self == their) or (self < their)
1801
1802     def __ge__(self, their):  # pragma: no cover
1803         return (self == their) or (self > their)
1804
1805     def _encode(self):  # pragma: no cover
1806         raise NotImplementedError()
1807
1808     def _encode_cer(self, writer):
1809         write_full(writer, self._encode())
1810
1811     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1812         yield NotImplemented
1813
1814     def _encode1st(self, state):
1815         raise NotImplementedError()
1816
1817     def _encode2nd(self, writer, state_iter):
1818         raise NotImplementedError()
1819
1820     def encode(self):
1821         """DER encode the structure
1822
1823         :returns: DER representation
1824         """
1825         raw = self._encode()
1826         if self._expl is None:
1827             return raw
1828         return b"".join((self._expl, len_encode(len(raw)), raw))
1829
1830     def encode1st(self, state=None):
1831         """Do the 1st pass of 2-pass encoding
1832
1833         :rtype: (int, array("L"))
1834         :returns: full length of encoded data and precalculated various
1835                   objects lengths
1836         """
1837         if state is None:
1838             state = state_2pass_new()
1839         if self._expl is None:
1840             return self._encode1st(state)
1841         state.append(0)
1842         idx = len(state) - 1
1843         vlen, _ = self._encode1st(state)
1844         state[idx] = vlen
1845         fulllen = len(self._expl) + len_size(vlen) + vlen
1846         return fulllen, state
1847
1848     def encode2nd(self, writer, state_iter):
1849         """Do the 2nd pass of 2-pass encoding
1850
1851         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1852         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1853         """
1854         if self._expl is None:
1855             self._encode2nd(writer, state_iter)
1856         else:
1857             write_full(writer, self._expl + len_encode(next(state_iter)))
1858             self._encode2nd(writer, state_iter)
1859
1860     def encode_cer(self, writer):
1861         """CER encode the structure to specified writer
1862
1863         :param writer: must comply with ``io.RawIOBase.write``
1864                        behaviour. It takes slice to be written and
1865                        returns number of bytes processed. If it returns
1866                        None, then exception will be raised
1867         """
1868         if self._expl is not None:
1869             write_full(writer, self._expl + LENINDEF)
1870         if getattr(self, "der_forced", False):
1871             write_full(writer, self._encode())
1872         else:
1873             self._encode_cer(writer)
1874         if self._expl is not None:
1875             write_full(writer, EOC)
1876
1877     def hexencode(self):
1878         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1879         """
1880         return hexenc(self.encode())
1881
1882     def decode(
1883             self,
1884             data,
1885             offset=0,
1886             leavemm=False,
1887             decode_path=(),
1888             ctx=None,
1889             tag_only=False,
1890             _ctx_immutable=True,
1891     ):
1892         """Decode the data
1893
1894         :param data: either binary or memoryview
1895         :param int offset: initial data's offset
1896         :param bool leavemm: do we need to leave memoryview of remaining
1897                     data as is, or convert it to bytes otherwise
1898         :param decode_path: current decode path (tuples of strings,
1899                             possibly with DecodePathDefBy) with will be
1900                             the root for all underlying objects
1901         :param ctx: optional :ref:`context <ctx>` governing decoding process
1902         :param bool tag_only: decode only the tag, without length and
1903                               contents (used only in Choice and Set
1904                               structures, trying to determine if tag satisfies
1905                               the schema)
1906         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1907                                     before using it?
1908         :returns: (Obj, remaining data)
1909
1910         .. seealso:: :ref:`decoding`
1911         """
1912         result = next(self.decode_evgen(
1913             data,
1914             offset,
1915             leavemm,
1916             decode_path,
1917             ctx,
1918             tag_only,
1919             _ctx_immutable,
1920             _evgen_mode=False,
1921         ))
1922         if result is None:
1923             return None
1924         _, obj, tail = result
1925         return obj, tail
1926
1927     def decode_evgen(
1928             self,
1929             data,
1930             offset=0,
1931             leavemm=False,
1932             decode_path=(),
1933             ctx=None,
1934             tag_only=False,
1935             _ctx_immutable=True,
1936             _evgen_mode=True,
1937     ):
1938         """Decode with evgen mode on
1939
1940         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1941         it returns the generator producing ``(decode_path, obj, tail)``
1942         values.
1943         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1944         """
1945         if ctx is None:
1946             ctx = {}
1947         elif _ctx_immutable:
1948             ctx = copy(ctx)
1949         tlv = memoryview(data)
1950         if (
1951                 _evgen_mode and
1952                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1953         ):
1954             _evgen_mode = False
1955         if self._expl is None:
1956             for result in self._decode(
1957                     tlv,
1958                     offset=offset,
1959                     decode_path=decode_path,
1960                     ctx=ctx,
1961                     tag_only=tag_only,
1962                     evgen_mode=_evgen_mode,
1963             ):
1964                 if tag_only:
1965                     yield None
1966                     return
1967                 _decode_path, obj, tail = result
1968                 if _decode_path is not decode_path:
1969                     yield result
1970         else:
1971             try:
1972                 t, tlen, lv = tag_strip(tlv)
1973             except DecodeError as err:
1974                 raise err.__class__(
1975                     msg=err.msg,
1976                     klass=self.__class__,
1977                     decode_path=decode_path,
1978                     offset=offset,
1979                 )
1980             if t != self._expl:
1981                 raise TagMismatch(
1982                     klass=self.__class__,
1983                     decode_path=decode_path,
1984                     offset=offset,
1985                 )
1986             try:
1987                 l, llen, v = len_decode(lv)
1988             except LenIndefForm as err:
1989                 if not ctx.get("bered", False):
1990                     raise err.__class__(
1991                         msg=err.msg,
1992                         klass=self.__class__,
1993                         decode_path=decode_path,
1994                         offset=offset,
1995                     )
1996                 llen, v = 1, lv[1:]
1997                 offset += tlen + llen
1998                 for result in self._decode(
1999                         v,
2000                         offset=offset,
2001                         decode_path=decode_path,
2002                         ctx=ctx,
2003                         tag_only=tag_only,
2004                         evgen_mode=_evgen_mode,
2005                 ):
2006                     if tag_only:  # pragma: no cover
2007                         yield None
2008                         return
2009                     _decode_path, obj, tail = result
2010                     if _decode_path is not decode_path:
2011                         yield result
2012                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2013                 if eoc_expected.tobytes() != EOC:
2014                     raise DecodeError(
2015                         "no EOC",
2016                         klass=self.__class__,
2017                         decode_path=decode_path,
2018                         offset=offset,
2019                     )
2020                 obj.vlen += EOC_LEN
2021                 obj.expl_lenindef = True
2022             except DecodeError as err:
2023                 raise err.__class__(
2024                     msg=err.msg,
2025                     klass=self.__class__,
2026                     decode_path=decode_path,
2027                     offset=offset,
2028                 )
2029             else:
2030                 if l > len(v):
2031                     raise NotEnoughData(
2032                         "encoded length is longer than data",
2033                         klass=self.__class__,
2034                         decode_path=decode_path,
2035                         offset=offset,
2036                     )
2037                 for result in self._decode(
2038                         v,
2039                         offset=offset + tlen + llen,
2040                         decode_path=decode_path,
2041                         ctx=ctx,
2042                         tag_only=tag_only,
2043                         evgen_mode=_evgen_mode,
2044                 ):
2045                     if tag_only:  # pragma: no cover
2046                         yield None
2047                         return
2048                     _decode_path, obj, tail = result
2049                     if _decode_path is not decode_path:
2050                         yield result
2051                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2052                     raise DecodeError(
2053                         "explicit tag out-of-bound, longer than data",
2054                         klass=self.__class__,
2055                         decode_path=decode_path,
2056                         offset=offset,
2057                     )
2058         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2059
2060     def decod(self, data, offset=0, decode_path=(), ctx=None):
2061         """Decode the data, check that tail is empty
2062
2063         :raises ExceedingData: if tail is not empty
2064
2065         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2066         (decode without tail) that also checks that there is no
2067         trailing data left.
2068         """
2069         obj, tail = self.decode(
2070             data,
2071             offset=offset,
2072             decode_path=decode_path,
2073             ctx=ctx,
2074             leavemm=True,
2075         )
2076         if len(tail) > 0:
2077             raise ExceedingData(len(tail))
2078         return obj
2079
2080     def hexdecode(self, data, *args, **kwargs):
2081         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2082         """
2083         return self.decode(hexdec(data), *args, **kwargs)
2084
2085     def hexdecod(self, data, *args, **kwargs):
2086         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2087         """
2088         return self.decod(hexdec(data), *args, **kwargs)
2089
2090     @property
2091     def expled(self):
2092         """.. seealso:: :ref:`decoding`
2093         """
2094         return self._expl is not None
2095
2096     @property
2097     def expl_tag(self):
2098         """.. seealso:: :ref:`decoding`
2099         """
2100         return self._expl
2101
2102     @property
2103     def expl_tlen(self):
2104         """.. seealso:: :ref:`decoding`
2105         """
2106         return len(self._expl)
2107
2108     @property
2109     def expl_llen(self):
2110         """.. seealso:: :ref:`decoding`
2111         """
2112         if self.expl_lenindef:
2113             return 1
2114         return len(len_encode(self.tlvlen))
2115
2116     @property
2117     def expl_offset(self):
2118         """.. seealso:: :ref:`decoding`
2119         """
2120         return self.offset - self.expl_tlen - self.expl_llen
2121
2122     @property
2123     def expl_vlen(self):
2124         """.. seealso:: :ref:`decoding`
2125         """
2126         return self.tlvlen
2127
2128     @property
2129     def expl_tlvlen(self):
2130         """.. seealso:: :ref:`decoding`
2131         """
2132         return self.expl_tlen + self.expl_llen + self.expl_vlen
2133
2134     @property
2135     def fulloffset(self):
2136         """.. seealso:: :ref:`decoding`
2137         """
2138         return self.expl_offset if self.expled else self.offset
2139
2140     @property
2141     def fulllen(self):
2142         """.. seealso:: :ref:`decoding`
2143         """
2144         return self.expl_tlvlen if self.expled else self.tlvlen
2145
2146     def pps_lenindef(self, decode_path):
2147         if self.lenindef and not (
2148                 getattr(self, "defined", None) is not None and
2149                 self.defined[1].lenindef
2150         ):
2151             yield _pp(
2152                 asn1_type_name="EOC",
2153                 obj_name="",
2154                 decode_path=decode_path,
2155                 offset=(
2156                     self.offset + self.tlvlen -
2157                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2158                 ),
2159                 tlen=1,
2160                 llen=1,
2161                 vlen=0,
2162                 ber_encoded=True,
2163                 bered=True,
2164             )
2165         if self.expl_lenindef:
2166             yield _pp(
2167                 asn1_type_name="EOC",
2168                 obj_name="EXPLICIT",
2169                 decode_path=decode_path,
2170                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2171                 tlen=1,
2172                 llen=1,
2173                 vlen=0,
2174                 ber_encoded=True,
2175                 bered=True,
2176             )
2177
2178
2179 def encode_cer(obj):
2180     """Encode to CER in memory buffer
2181
2182     :returns bytes: memory buffer contents
2183     """
2184     buf = BytesIO()
2185     obj.encode_cer(buf.write)
2186     return buf.getvalue()
2187
2188
2189 def encode2pass(obj):
2190     """Encode (2-pass mode) to DER in memory buffer
2191
2192     :returns bytes: memory buffer contents
2193     """
2194     buf = BytesIO()
2195     _, state = obj.encode1st()
2196     obj.encode2nd(buf.write, iter(state))
2197     return buf.getvalue()
2198
2199
2200 class DecodePathDefBy(object):
2201     """DEFINED BY representation inside decode path
2202     """
2203     __slots__ = ("defined_by",)
2204
2205     def __init__(self, defined_by):
2206         self.defined_by = defined_by
2207
2208     def __ne__(self, their):
2209         return not(self == their)
2210
2211     def __eq__(self, their):
2212         if not isinstance(their, self.__class__):
2213             return False
2214         return self.defined_by == their.defined_by
2215
2216     def __str__(self):
2217         return "DEFINED BY " + str(self.defined_by)
2218
2219     def __repr__(self):
2220         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2221
2222
2223 ########################################################################
2224 # Pretty printing
2225 ########################################################################
2226
2227 PP = namedtuple("PP", (
2228     "obj",
2229     "asn1_type_name",
2230     "obj_name",
2231     "decode_path",
2232     "value",
2233     "blob",
2234     "optional",
2235     "default",
2236     "impl",
2237     "expl",
2238     "offset",
2239     "tlen",
2240     "llen",
2241     "vlen",
2242     "expl_offset",
2243     "expl_tlen",
2244     "expl_llen",
2245     "expl_vlen",
2246     "expl_lenindef",
2247     "lenindef",
2248     "ber_encoded",
2249     "bered",
2250 ), **NAMEDTUPLE_KWARGS)
2251
2252
2253 def _pp(
2254         obj=None,
2255         asn1_type_name="unknown",
2256         obj_name="unknown",
2257         decode_path=(),
2258         value=None,
2259         blob=None,
2260         optional=False,
2261         default=False,
2262         impl=None,
2263         expl=None,
2264         offset=0,
2265         tlen=0,
2266         llen=0,
2267         vlen=0,
2268         expl_offset=None,
2269         expl_tlen=None,
2270         expl_llen=None,
2271         expl_vlen=None,
2272         expl_lenindef=False,
2273         lenindef=False,
2274         ber_encoded=False,
2275         bered=False,
2276 ):
2277     return PP(
2278         obj,
2279         asn1_type_name,
2280         obj_name,
2281         decode_path,
2282         value,
2283         blob,
2284         optional,
2285         default,
2286         impl,
2287         expl,
2288         offset,
2289         tlen,
2290         llen,
2291         vlen,
2292         expl_offset,
2293         expl_tlen,
2294         expl_llen,
2295         expl_vlen,
2296         expl_lenindef,
2297         lenindef,
2298         ber_encoded,
2299         bered,
2300     )
2301
2302
2303 def _colourize(what, colour, with_colours, attrs=("bold",)):
2304     return colored(what, colour, attrs=attrs) if with_colours else what
2305
2306
2307 def colonize_hex(hexed):
2308     """Separate hexadecimal string with colons
2309     """
2310     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2311
2312
2313 def find_oid_name(asn1_type_name, oid_maps, value):
2314     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2315         for oid_map in oid_maps:
2316             oid_name = oid_map.get(value)
2317             if oid_name is not None:
2318                 return oid_name
2319     return None
2320
2321
2322 def pp_console_row(
2323         pp,
2324         oid_maps=(),
2325         with_offsets=False,
2326         with_blob=True,
2327         with_colours=False,
2328         with_decode_path=False,
2329         decode_path_len_decrease=0,
2330 ):
2331     cols = []
2332     if with_offsets:
2333         col = "%5d%s%s" % (
2334             pp.offset,
2335             (
2336                 "  " if pp.expl_offset is None else
2337                 ("-%d" % (pp.offset - pp.expl_offset))
2338             ),
2339             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2340         )
2341         col = _colourize(col, "red", with_colours, ())
2342         col += _colourize("B", "red", with_colours) if pp.bered else " "
2343         cols.append(col)
2344         col = "[%d,%d,%4d]%s" % (
2345             pp.tlen, pp.llen, pp.vlen,
2346             LENINDEF_PP_CHAR if pp.lenindef else " "
2347         )
2348         col = _colourize(col, "green", with_colours, ())
2349         cols.append(col)
2350     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2351     if decode_path_len > 0:
2352         cols.append(" ." * decode_path_len)
2353         ent = pp.decode_path[-1]
2354         if isinstance(ent, DecodePathDefBy):
2355             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2356             value = str(ent.defined_by)
2357             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2358             if oid_name is None:
2359                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2360             else:
2361                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2362         else:
2363             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2364     if pp.expl is not None:
2365         klass, _, num = pp.expl
2366         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2367         cols.append(_colourize(col, "blue", with_colours))
2368     if pp.impl is not None:
2369         klass, _, num = pp.impl
2370         col = "[%s%d]" % (TagClassReprs[klass], num)
2371         cols.append(_colourize(col, "blue", with_colours))
2372     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2373         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2374     if pp.ber_encoded:
2375         cols.append(_colourize("BER", "red", with_colours))
2376     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2377     if pp.value is not None:
2378         value = pp.value
2379         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2380         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2381         if oid_name is not None:
2382             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2383         if pp.asn1_type_name == Integer.asn1_type_name:
2384             cols.append(_colourize(
2385                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2386             ))
2387     if with_blob:
2388         if pp.blob.__class__ == binary_type:
2389             cols.append(hexenc(pp.blob))
2390         elif pp.blob.__class__ == tuple:
2391             cols.append(", ".join(pp.blob))
2392     if pp.optional:
2393         cols.append(_colourize("OPTIONAL", "red", with_colours))
2394     if pp.default:
2395         cols.append(_colourize("DEFAULT", "red", with_colours))
2396     if with_decode_path:
2397         cols.append(_colourize(
2398             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2399             "grey",
2400             with_colours,
2401         ))
2402     return " ".join(cols)
2403
2404
2405 def pp_console_blob(pp, decode_path_len_decrease=0):
2406     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2407     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2408     if decode_path_len > 0:
2409         cols.append(" ." * (decode_path_len + 1))
2410     if pp.blob.__class__ == binary_type:
2411         blob = hexenc(pp.blob).upper()
2412         for i in six_xrange(0, len(blob), 32):
2413             chunk = blob[i:i + 32]
2414             yield " ".join(cols + [colonize_hex(chunk)])
2415     elif pp.blob.__class__ == tuple:
2416         yield " ".join(cols + [", ".join(pp.blob)])
2417
2418
2419 def pprint(
2420         obj,
2421         oid_maps=(),
2422         big_blobs=False,
2423         with_colours=False,
2424         with_decode_path=False,
2425         decode_path_only=(),
2426         decode_path=(),
2427 ):
2428     """Pretty print object
2429
2430     :param Obj obj: object you want to pretty print
2431     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2432                      Its human readable form is printed when OID is met
2433     :param big_blobs: if large binary objects are met (like OctetString
2434                       values), do we need to print them too, on separate
2435                       lines
2436     :param with_colours: colourize output, if ``termcolor`` library
2437                          is available
2438     :param with_decode_path: print decode path
2439     :param decode_path_only: print only that specified decode path
2440     """
2441     def _pprint_pps(pps):
2442         for pp in pps:
2443             if hasattr(pp, "_fields"):
2444                 if (
2445                         decode_path_only != () and
2446                         tuple(
2447                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2448                         ) != decode_path_only
2449                 ):
2450                     continue
2451                 if big_blobs:
2452                     yield pp_console_row(
2453                         pp,
2454                         oid_maps=oid_maps,
2455                         with_offsets=True,
2456                         with_blob=False,
2457                         with_colours=with_colours,
2458                         with_decode_path=with_decode_path,
2459                         decode_path_len_decrease=len(decode_path_only),
2460                     )
2461                     for row in pp_console_blob(
2462                             pp,
2463                             decode_path_len_decrease=len(decode_path_only),
2464                     ):
2465                         yield row
2466                 else:
2467                     yield pp_console_row(
2468                         pp,
2469                         oid_maps=oid_maps,
2470                         with_offsets=True,
2471                         with_blob=True,
2472                         with_colours=with_colours,
2473                         with_decode_path=with_decode_path,
2474                         decode_path_len_decrease=len(decode_path_only),
2475                     )
2476             else:
2477                 for row in _pprint_pps(pp):
2478                     yield row
2479     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2480
2481
2482 ########################################################################
2483 # ASN.1 primitive types
2484 ########################################################################
2485
2486 BooleanState = namedtuple(
2487     "BooleanState",
2488     BasicState._fields + ("value",),
2489     **NAMEDTUPLE_KWARGS
2490 )
2491
2492
2493 class Boolean(Obj):
2494     """``BOOLEAN`` boolean type
2495
2496     >>> b = Boolean(True)
2497     BOOLEAN True
2498     >>> b == Boolean(True)
2499     True
2500     >>> bool(b)
2501     True
2502     """
2503     __slots__ = ()
2504     tag_default = tag_encode(1)
2505     asn1_type_name = "BOOLEAN"
2506
2507     def __init__(
2508             self,
2509             value=None,
2510             impl=None,
2511             expl=None,
2512             default=None,
2513             optional=False,
2514             _decoded=(0, 0, 0),
2515     ):
2516         """
2517         :param value: set the value. Either boolean type, or
2518                       :py:class:`pyderasn.Boolean` object
2519         :param bytes impl: override default tag with ``IMPLICIT`` one
2520         :param bytes expl: override default tag with ``EXPLICIT`` one
2521         :param default: set default value. Type same as in ``value``
2522         :param bool optional: is object ``OPTIONAL`` in sequence
2523         """
2524         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2525         self._value = None if value is None else self._value_sanitize(value)
2526         if default is not None:
2527             default = self._value_sanitize(default)
2528             self.default = self.__class__(
2529                 value=default,
2530                 impl=self.tag,
2531                 expl=self._expl,
2532             )
2533             if value is None:
2534                 self._value = default
2535
2536     def _value_sanitize(self, value):
2537         if value.__class__ == bool:
2538             return value
2539         if issubclass(value.__class__, Boolean):
2540             return value._value
2541         raise InvalidValueType((self.__class__, bool))
2542
2543     @property
2544     def ready(self):
2545         return self._value is not None
2546
2547     def __getstate__(self):
2548         return BooleanState(
2549             __version__,
2550             self.tag,
2551             self._tag_order,
2552             self._expl,
2553             self.default,
2554             self.optional,
2555             self.offset,
2556             self.llen,
2557             self.vlen,
2558             self.expl_lenindef,
2559             self.lenindef,
2560             self.ber_encoded,
2561             self._value,
2562         )
2563
2564     def __setstate__(self, state):
2565         super(Boolean, self).__setstate__(state)
2566         self._value = state.value
2567
2568     def __nonzero__(self):
2569         self._assert_ready()
2570         return self._value
2571
2572     def __bool__(self):
2573         self._assert_ready()
2574         return self._value
2575
2576     def __eq__(self, their):
2577         if their.__class__ == bool:
2578             return self._value == their
2579         if not issubclass(their.__class__, Boolean):
2580             return False
2581         return (
2582             self._value == their._value and
2583             self.tag == their.tag and
2584             self._expl == their._expl
2585         )
2586
2587     def __call__(
2588             self,
2589             value=None,
2590             impl=None,
2591             expl=None,
2592             default=None,
2593             optional=None,
2594     ):
2595         return self.__class__(
2596             value=value,
2597             impl=self.tag if impl is None else impl,
2598             expl=self._expl if expl is None else expl,
2599             default=self.default if default is None else default,
2600             optional=self.optional if optional is None else optional,
2601         )
2602
2603     def _encode(self):
2604         self._assert_ready()
2605         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2606
2607     def _encode1st(self, state):
2608         return len(self.tag) + 2, state
2609
2610     def _encode2nd(self, writer, state_iter):
2611         self._assert_ready()
2612         write_full(writer, self._encode())
2613
2614     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2615         try:
2616             t, _, lv = tag_strip(tlv)
2617         except DecodeError as err:
2618             raise err.__class__(
2619                 msg=err.msg,
2620                 klass=self.__class__,
2621                 decode_path=decode_path,
2622                 offset=offset,
2623             )
2624         if t != self.tag:
2625             raise TagMismatch(
2626                 klass=self.__class__,
2627                 decode_path=decode_path,
2628                 offset=offset,
2629             )
2630         if tag_only:
2631             yield None
2632             return
2633         try:
2634             l, _, v = len_decode(lv)
2635         except DecodeError as err:
2636             raise err.__class__(
2637                 msg=err.msg,
2638                 klass=self.__class__,
2639                 decode_path=decode_path,
2640                 offset=offset,
2641             )
2642         if l != 1:
2643             raise InvalidLength(
2644                 "Boolean's length must be equal to 1",
2645                 klass=self.__class__,
2646                 decode_path=decode_path,
2647                 offset=offset,
2648             )
2649         if l > len(v):
2650             raise NotEnoughData(
2651                 "encoded length is longer than data",
2652                 klass=self.__class__,
2653                 decode_path=decode_path,
2654                 offset=offset,
2655             )
2656         first_octet = byte2int(v)
2657         ber_encoded = False
2658         if first_octet == 0:
2659             value = False
2660         elif first_octet == 0xFF:
2661             value = True
2662         elif ctx.get("bered", False):
2663             value = True
2664             ber_encoded = True
2665         else:
2666             raise DecodeError(
2667                 "unacceptable Boolean value",
2668                 klass=self.__class__,
2669                 decode_path=decode_path,
2670                 offset=offset,
2671             )
2672         obj = self.__class__(
2673             value=value,
2674             impl=self.tag,
2675             expl=self._expl,
2676             default=self.default,
2677             optional=self.optional,
2678             _decoded=(offset, 1, 1),
2679         )
2680         obj.ber_encoded = ber_encoded
2681         yield decode_path, obj, v[1:]
2682
2683     def __repr__(self):
2684         return pp_console_row(next(self.pps()))
2685
2686     def pps(self, decode_path=()):
2687         yield _pp(
2688             obj=self,
2689             asn1_type_name=self.asn1_type_name,
2690             obj_name=self.__class__.__name__,
2691             decode_path=decode_path,
2692             value=str(self._value) if self.ready else None,
2693             optional=self.optional,
2694             default=self == self.default,
2695             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2696             expl=None if self._expl is None else tag_decode(self._expl),
2697             offset=self.offset,
2698             tlen=self.tlen,
2699             llen=self.llen,
2700             vlen=self.vlen,
2701             expl_offset=self.expl_offset if self.expled else None,
2702             expl_tlen=self.expl_tlen if self.expled else None,
2703             expl_llen=self.expl_llen if self.expled else None,
2704             expl_vlen=self.expl_vlen if self.expled else None,
2705             expl_lenindef=self.expl_lenindef,
2706             ber_encoded=self.ber_encoded,
2707             bered=self.bered,
2708         )
2709         for pp in self.pps_lenindef(decode_path):
2710             yield pp
2711
2712
2713 IntegerState = namedtuple(
2714     "IntegerState",
2715     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2716     **NAMEDTUPLE_KWARGS
2717 )
2718
2719
2720 class Integer(Obj):
2721     """``INTEGER`` integer type
2722
2723     >>> b = Integer(-123)
2724     INTEGER -123
2725     >>> b == Integer(-123)
2726     True
2727     >>> int(b)
2728     -123
2729
2730     >>> Integer(2, bounds=(1, 3))
2731     INTEGER 2
2732     >>> Integer(5, bounds=(1, 3))
2733     Traceback (most recent call last):
2734     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2735
2736     ::
2737
2738         class Version(Integer):
2739             schema = (
2740                 ("v1", 0),
2741                 ("v2", 1),
2742                 ("v3", 2),
2743             )
2744
2745     >>> v = Version("v1")
2746     Version INTEGER v1
2747     >>> int(v)
2748     0
2749     >>> v.named
2750     'v1'
2751     >>> v.specs
2752     {'v3': 2, 'v1': 0, 'v2': 1}
2753     """
2754     __slots__ = ("specs", "_bound_min", "_bound_max")
2755     tag_default = tag_encode(2)
2756     asn1_type_name = "INTEGER"
2757
2758     def __init__(
2759             self,
2760             value=None,
2761             bounds=None,
2762             impl=None,
2763             expl=None,
2764             default=None,
2765             optional=False,
2766             _specs=None,
2767             _decoded=(0, 0, 0),
2768     ):
2769         """
2770         :param value: set the value. Either integer type, named value
2771                       (if ``schema`` is specified in the class), or
2772                       :py:class:`pyderasn.Integer` object
2773         :param bounds: set ``(MIN, MAX)`` value constraint.
2774                        (-inf, +inf) by default
2775         :param bytes impl: override default tag with ``IMPLICIT`` one
2776         :param bytes expl: override default tag with ``EXPLICIT`` one
2777         :param default: set default value. Type same as in ``value``
2778         :param bool optional: is object ``OPTIONAL`` in sequence
2779         """
2780         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2781         self._value = value
2782         specs = getattr(self, "schema", {}) if _specs is None else _specs
2783         self.specs = specs if specs.__class__ == dict else dict(specs)
2784         self._bound_min, self._bound_max = getattr(
2785             self,
2786             "bounds",
2787             (float("-inf"), float("+inf")),
2788         ) if bounds is None else bounds
2789         if value is not None:
2790             self._value = self._value_sanitize(value)
2791         if default is not None:
2792             default = self._value_sanitize(default)
2793             self.default = self.__class__(
2794                 value=default,
2795                 impl=self.tag,
2796                 expl=self._expl,
2797                 _specs=self.specs,
2798             )
2799             if self._value is None:
2800                 self._value = default
2801
2802     def _value_sanitize(self, value):
2803         if isinstance(value, integer_types):
2804             pass
2805         elif issubclass(value.__class__, Integer):
2806             value = value._value
2807         elif value.__class__ == str:
2808             value = self.specs.get(value)
2809             if value is None:
2810                 raise ObjUnknown("integer value: %s" % value)
2811         else:
2812             raise InvalidValueType((self.__class__, int, str))
2813         if not self._bound_min <= value <= self._bound_max:
2814             raise BoundsError(self._bound_min, value, self._bound_max)
2815         return value
2816
2817     @property
2818     def ready(self):
2819         return self._value is not None
2820
2821     def __getstate__(self):
2822         return IntegerState(
2823             __version__,
2824             self.tag,
2825             self._tag_order,
2826             self._expl,
2827             self.default,
2828             self.optional,
2829             self.offset,
2830             self.llen,
2831             self.vlen,
2832             self.expl_lenindef,
2833             self.lenindef,
2834             self.ber_encoded,
2835             self.specs,
2836             self._value,
2837             self._bound_min,
2838             self._bound_max,
2839         )
2840
2841     def __setstate__(self, state):
2842         super(Integer, self).__setstate__(state)
2843         self.specs = state.specs
2844         self._value = state.value
2845         self._bound_min = state.bound_min
2846         self._bound_max = state.bound_max
2847
2848     def __int__(self):
2849         self._assert_ready()
2850         return int(self._value)
2851
2852     def tohex(self):
2853         """Hexadecimal representation
2854
2855         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2856         """
2857         hex_repr = hex(int(self))[2:].upper()
2858         if len(hex_repr) % 2 != 0:
2859             hex_repr = "0" + hex_repr
2860         return hex_repr
2861
2862     def __hash__(self):
2863         self._assert_ready()
2864         return hash(b"".join((
2865             self.tag,
2866             bytes(self._expl or b""),
2867             str(self._value).encode("ascii"),
2868         )))
2869
2870     def __eq__(self, their):
2871         if isinstance(their, integer_types):
2872             return self._value == their
2873         if not issubclass(their.__class__, Integer):
2874             return False
2875         return (
2876             self._value == their._value and
2877             self.tag == their.tag and
2878             self._expl == their._expl
2879         )
2880
2881     def __lt__(self, their):
2882         return self._value < their._value
2883
2884     @property
2885     def named(self):
2886         """Return named representation (if exists) of the value
2887         """
2888         for name, value in iteritems(self.specs):
2889             if value == self._value:
2890                 return name
2891         return None
2892
2893     def __call__(
2894             self,
2895             value=None,
2896             bounds=None,
2897             impl=None,
2898             expl=None,
2899             default=None,
2900             optional=None,
2901     ):
2902         return self.__class__(
2903             value=value,
2904             bounds=(
2905                 (self._bound_min, self._bound_max)
2906                 if bounds is None else bounds
2907             ),
2908             impl=self.tag if impl is None else impl,
2909             expl=self._expl if expl is None else expl,
2910             default=self.default if default is None else default,
2911             optional=self.optional if optional is None else optional,
2912             _specs=self.specs,
2913         )
2914
2915     def _encode_payload(self):
2916         self._assert_ready()
2917         value = self._value
2918         if PY2:
2919             if value == 0:
2920                 octets = bytearray([0])
2921             elif value < 0:
2922                 value = -value
2923                 value -= 1
2924                 octets = bytearray()
2925                 while value > 0:
2926                     octets.append((value & 0xFF) ^ 0xFF)
2927                     value >>= 8
2928                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2929                     octets.append(0xFF)
2930             else:
2931                 octets = bytearray()
2932                 while value > 0:
2933                     octets.append(value & 0xFF)
2934                     value >>= 8
2935                 if octets[-1] & 0x80 > 0:
2936                     octets.append(0x00)
2937             octets.reverse()
2938             octets = bytes(octets)
2939         else:
2940             bytes_len = ceil(value.bit_length() / 8) or 1
2941             while True:
2942                 try:
2943                     octets = value.to_bytes(
2944                         bytes_len,
2945                         byteorder="big",
2946                         signed=True,
2947                     )
2948                 except OverflowError:
2949                     bytes_len += 1
2950                 else:
2951                     break
2952         return octets
2953
2954     def _encode(self):
2955         octets = self._encode_payload()
2956         return b"".join((self.tag, len_encode(len(octets)), octets))
2957
2958     def _encode1st(self, state):
2959         l = len(self._encode_payload())
2960         return len(self.tag) + len_size(l) + l, state
2961
2962     def _encode2nd(self, writer, state_iter):
2963         write_full(writer, self._encode())
2964
2965     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2966         try:
2967             t, _, lv = tag_strip(tlv)
2968         except DecodeError as err:
2969             raise err.__class__(
2970                 msg=err.msg,
2971                 klass=self.__class__,
2972                 decode_path=decode_path,
2973                 offset=offset,
2974             )
2975         if t != self.tag:
2976             raise TagMismatch(
2977                 klass=self.__class__,
2978                 decode_path=decode_path,
2979                 offset=offset,
2980             )
2981         if tag_only:
2982             yield None
2983             return
2984         try:
2985             l, llen, v = len_decode(lv)
2986         except DecodeError as err:
2987             raise err.__class__(
2988                 msg=err.msg,
2989                 klass=self.__class__,
2990                 decode_path=decode_path,
2991                 offset=offset,
2992             )
2993         if l > len(v):
2994             raise NotEnoughData(
2995                 "encoded length is longer than data",
2996                 klass=self.__class__,
2997                 decode_path=decode_path,
2998                 offset=offset,
2999             )
3000         if l == 0:
3001             raise NotEnoughData(
3002                 "zero length",
3003                 klass=self.__class__,
3004                 decode_path=decode_path,
3005                 offset=offset,
3006             )
3007         v, tail = v[:l], v[l:]
3008         first_octet = byte2int(v)
3009         if l > 1:
3010             second_octet = byte2int(v[1:])
3011             if (
3012                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3013                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3014             ):
3015                 raise DecodeError(
3016                     "non normalized integer",
3017                     klass=self.__class__,
3018                     decode_path=decode_path,
3019                     offset=offset,
3020                 )
3021         if PY2:
3022             value = 0
3023             if first_octet & 0x80 > 0:
3024                 octets = bytearray()
3025                 for octet in bytearray(v):
3026                     octets.append(octet ^ 0xFF)
3027                 for octet in octets:
3028                     value = (value << 8) | octet
3029                 value += 1
3030                 value = -value
3031             else:
3032                 for octet in bytearray(v):
3033                     value = (value << 8) | octet
3034         else:
3035             value = int.from_bytes(v, byteorder="big", signed=True)
3036         try:
3037             obj = self.__class__(
3038                 value=value,
3039                 bounds=(self._bound_min, self._bound_max),
3040                 impl=self.tag,
3041                 expl=self._expl,
3042                 default=self.default,
3043                 optional=self.optional,
3044                 _specs=self.specs,
3045                 _decoded=(offset, llen, l),
3046             )
3047         except BoundsError as err:
3048             raise DecodeError(
3049                 msg=str(err),
3050                 klass=self.__class__,
3051                 decode_path=decode_path,
3052                 offset=offset,
3053             )
3054         yield decode_path, obj, tail
3055
3056     def __repr__(self):
3057         return pp_console_row(next(self.pps()))
3058
3059     def pps(self, decode_path=()):
3060         yield _pp(
3061             obj=self,
3062             asn1_type_name=self.asn1_type_name,
3063             obj_name=self.__class__.__name__,
3064             decode_path=decode_path,
3065             value=(self.named or str(self._value)) if self.ready else None,
3066             optional=self.optional,
3067             default=self == self.default,
3068             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3069             expl=None if self._expl is None else tag_decode(self._expl),
3070             offset=self.offset,
3071             tlen=self.tlen,
3072             llen=self.llen,
3073             vlen=self.vlen,
3074             expl_offset=self.expl_offset if self.expled else None,
3075             expl_tlen=self.expl_tlen if self.expled else None,
3076             expl_llen=self.expl_llen if self.expled else None,
3077             expl_vlen=self.expl_vlen if self.expled else None,
3078             expl_lenindef=self.expl_lenindef,
3079             bered=self.bered,
3080         )
3081         for pp in self.pps_lenindef(decode_path):
3082             yield pp
3083
3084
3085 BitStringState = namedtuple(
3086     "BitStringState",
3087     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3088     **NAMEDTUPLE_KWARGS
3089 )
3090
3091
3092 class BitString(Obj):
3093     """``BIT STRING`` bit string type
3094
3095     >>> BitString(b"hello world")
3096     BIT STRING 88 bits 68656c6c6f20776f726c64
3097     >>> bytes(b)
3098     b'hello world'
3099     >>> b == b"hello world"
3100     True
3101     >>> b.bit_len
3102     88
3103
3104     >>> BitString("'0A3B5F291CD'H")
3105     BIT STRING 44 bits 0a3b5f291cd0
3106     >>> b = BitString("'010110000000'B")
3107     BIT STRING 12 bits 5800
3108     >>> b.bit_len
3109     12
3110     >>> b[0], b[1], b[2], b[3]
3111     (False, True, False, True)
3112     >>> b[1000]
3113     False
3114     >>> [v for v in b]
3115     [False, True, False, True, True, False, False, False, False, False, False, False]
3116
3117     ::
3118
3119         class KeyUsage(BitString):
3120             schema = (
3121                 ("digitalSignature", 0),
3122                 ("nonRepudiation", 1),
3123                 ("keyEncipherment", 2),
3124             )
3125
3126     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3127     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3128     >>> b.named
3129     ['nonRepudiation', 'keyEncipherment']
3130     >>> b.specs
3131     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3132
3133     .. note::
3134
3135        Pay attention that BIT STRING can be encoded both in primitive
3136        and constructed forms. Decoder always checks constructed form tag
3137        additionally to specified primitive one. If BER decoding is
3138        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3139        of DER restrictions.
3140     """
3141     __slots__ = ("tag_constructed", "specs", "defined")
3142     tag_default = tag_encode(3)
3143     asn1_type_name = "BIT STRING"
3144
3145     def __init__(
3146             self,
3147             value=None,
3148             impl=None,
3149             expl=None,
3150             default=None,
3151             optional=False,
3152             _specs=None,
3153             _decoded=(0, 0, 0),
3154     ):
3155         """
3156         :param value: set the value. Either binary type, tuple of named
3157                       values (if ``schema`` is specified in the class),
3158                       string in ``'XXX...'B`` form, or
3159                       :py:class:`pyderasn.BitString` object
3160         :param bytes impl: override default tag with ``IMPLICIT`` one
3161         :param bytes expl: override default tag with ``EXPLICIT`` one
3162         :param default: set default value. Type same as in ``value``
3163         :param bool optional: is object ``OPTIONAL`` in sequence
3164         """
3165         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3166         specs = getattr(self, "schema", {}) if _specs is None else _specs
3167         self.specs = specs if specs.__class__ == dict else dict(specs)
3168         self._value = None if value is None else self._value_sanitize(value)
3169         if default is not None:
3170             default = self._value_sanitize(default)
3171             self.default = self.__class__(
3172                 value=default,
3173                 impl=self.tag,
3174                 expl=self._expl,
3175             )
3176             if value is None:
3177                 self._value = default
3178         self.defined = None
3179         tag_klass, _, tag_num = tag_decode(self.tag)
3180         self.tag_constructed = tag_encode(
3181             klass=tag_klass,
3182             form=TagFormConstructed,
3183             num=tag_num,
3184         )
3185
3186     def _bits2octets(self, bits):
3187         if len(self.specs) > 0:
3188             bits = bits.rstrip("0")
3189         bit_len = len(bits)
3190         bits += "0" * ((8 - (bit_len % 8)) % 8)
3191         octets = bytearray(len(bits) // 8)
3192         for i in six_xrange(len(octets)):
3193             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3194         return bit_len, bytes(octets)
3195
3196     def _value_sanitize(self, value):
3197         if isinstance(value, (string_types, binary_type)):
3198             if (
3199                     isinstance(value, string_types) and
3200                     value.startswith("'")
3201             ):
3202                 if value.endswith("'B"):
3203                     value = value[1:-2]
3204                     if not frozenset(value) <= SET01:
3205                         raise ValueError("B's coding contains unacceptable chars")
3206                     return self._bits2octets(value)
3207                 if value.endswith("'H"):
3208                     value = value[1:-2]
3209                     return (
3210                         len(value) * 4,
3211                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3212                     )
3213             if value.__class__ == binary_type:
3214                 return (len(value) * 8, value)
3215             raise InvalidValueType((self.__class__, string_types, binary_type))
3216         if value.__class__ == tuple:
3217             if (
3218                     len(value) == 2 and
3219                     isinstance(value[0], integer_types) and
3220                     value[1].__class__ == binary_type
3221             ):
3222                 return value
3223             bits = []
3224             for name in value:
3225                 bit = self.specs.get(name)
3226                 if bit is None:
3227                     raise ObjUnknown("BitString value: %s" % name)
3228                 bits.append(bit)
3229             if len(bits) == 0:
3230                 return self._bits2octets("")
3231             bits = frozenset(bits)
3232             return self._bits2octets("".join(
3233                 ("1" if bit in bits else "0")
3234                 for bit in six_xrange(max(bits) + 1)
3235             ))
3236         if issubclass(value.__class__, BitString):
3237             return value._value
3238         raise InvalidValueType((self.__class__, binary_type, string_types))
3239
3240     @property
3241     def ready(self):
3242         return self._value is not None
3243
3244     def __getstate__(self):
3245         return BitStringState(
3246             __version__,
3247             self.tag,
3248             self._tag_order,
3249             self._expl,
3250             self.default,
3251             self.optional,
3252             self.offset,
3253             self.llen,
3254             self.vlen,
3255             self.expl_lenindef,
3256             self.lenindef,
3257             self.ber_encoded,
3258             self.specs,
3259             self._value,
3260             self.tag_constructed,
3261             self.defined,
3262         )
3263
3264     def __setstate__(self, state):
3265         super(BitString, self).__setstate__(state)
3266         self.specs = state.specs
3267         self._value = state.value
3268         self.tag_constructed = state.tag_constructed
3269         self.defined = state.defined
3270
3271     def __iter__(self):
3272         self._assert_ready()
3273         for i in six_xrange(self._value[0]):
3274             yield self[i]
3275
3276     @property
3277     def bit_len(self):
3278         """Returns number of bits in the string
3279         """
3280         self._assert_ready()
3281         return self._value[0]
3282
3283     def __bytes__(self):
3284         self._assert_ready()
3285         return self._value[1]
3286
3287     def __eq__(self, their):
3288         if their.__class__ == bytes:
3289             return self._value[1] == their
3290         if not issubclass(their.__class__, BitString):
3291             return False
3292         return (
3293             self._value == their._value and
3294             self.tag == their.tag and
3295             self._expl == their._expl
3296         )
3297
3298     @property
3299     def named(self):
3300         """Named representation (if exists) of the bits
3301
3302         :returns: [str(name), ...]
3303         """
3304         return [name for name, bit in iteritems(self.specs) if self[bit]]
3305
3306     def __call__(
3307             self,
3308             value=None,
3309             impl=None,
3310             expl=None,
3311             default=None,
3312             optional=None,
3313     ):
3314         return self.__class__(
3315             value=value,
3316             impl=self.tag if impl is None else impl,
3317             expl=self._expl if expl is None else expl,
3318             default=self.default if default is None else default,
3319             optional=self.optional if optional is None else optional,
3320             _specs=self.specs,
3321         )
3322
3323     def __getitem__(self, key):
3324         if key.__class__ == int:
3325             bit_len, octets = self._value
3326             if key >= bit_len:
3327                 return False
3328             return (
3329                 byte2int(memoryview(octets)[key // 8:]) >>
3330                 (7 - (key % 8))
3331             ) & 1 == 1
3332         if isinstance(key, string_types):
3333             value = self.specs.get(key)
3334             if value is None:
3335                 raise ObjUnknown("BitString value: %s" % key)
3336             return self[value]
3337         raise InvalidValueType((int, str))
3338
3339     def _encode(self):
3340         self._assert_ready()
3341         bit_len, octets = self._value
3342         return b"".join((
3343             self.tag,
3344             len_encode(len(octets) + 1),
3345             int2byte((8 - bit_len % 8) % 8),
3346             octets,
3347         ))
3348
3349     def _encode1st(self, state):
3350         self._assert_ready()
3351         _, octets = self._value
3352         l = len(octets) + 1
3353         return len(self.tag) + len_size(l) + l, state
3354
3355     def _encode2nd(self, writer, state_iter):
3356         bit_len, octets = self._value
3357         write_full(writer, b"".join((
3358             self.tag,
3359             len_encode(len(octets) + 1),
3360             int2byte((8 - bit_len % 8) % 8),
3361         )))
3362         write_full(writer, octets)
3363
3364     def _encode_cer(self, writer):
3365         bit_len, octets = self._value
3366         if len(octets) + 1 <= 1000:
3367             write_full(writer, self._encode())
3368             return
3369         write_full(writer, self.tag_constructed)
3370         write_full(writer, LENINDEF)
3371         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3372             write_full(writer, b"".join((
3373                 BitString.tag_default,
3374                 LEN1K,
3375                 int2byte(0),
3376                 octets[offset:offset + 999],
3377             )))
3378         tail = octets[offset + 999:]
3379         if len(tail) > 0:
3380             tail = int2byte((8 - bit_len % 8) % 8) + tail
3381             write_full(writer, b"".join((
3382                 BitString.tag_default,
3383                 len_encode(len(tail)),
3384                 tail,
3385             )))
3386         write_full(writer, EOC)
3387
3388     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3389         try:
3390             t, tlen, lv = tag_strip(tlv)
3391         except DecodeError as err:
3392             raise err.__class__(
3393                 msg=err.msg,
3394                 klass=self.__class__,
3395                 decode_path=decode_path,
3396                 offset=offset,
3397             )
3398         if t == self.tag:
3399             if tag_only:  # pragma: no cover
3400                 yield None
3401                 return
3402             try:
3403                 l, llen, v = len_decode(lv)
3404             except DecodeError as err:
3405                 raise err.__class__(
3406                     msg=err.msg,
3407                     klass=self.__class__,
3408                     decode_path=decode_path,
3409                     offset=offset,
3410                 )
3411             if l > len(v):
3412                 raise NotEnoughData(
3413                     "encoded length is longer than data",
3414                     klass=self.__class__,
3415                     decode_path=decode_path,
3416                     offset=offset,
3417                 )
3418             if l == 0:
3419                 raise NotEnoughData(
3420                     "zero length",
3421                     klass=self.__class__,
3422                     decode_path=decode_path,
3423                     offset=offset,
3424                 )
3425             pad_size = byte2int(v)
3426             if l == 1 and pad_size != 0:
3427                 raise DecodeError(
3428                     "invalid empty value",
3429                     klass=self.__class__,
3430                     decode_path=decode_path,
3431                     offset=offset,
3432                 )
3433             if pad_size > 7:
3434                 raise DecodeError(
3435                     "too big pad",
3436                     klass=self.__class__,
3437                     decode_path=decode_path,
3438                     offset=offset,
3439                 )
3440             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3441                 raise DecodeError(
3442                     "invalid pad",
3443                     klass=self.__class__,
3444                     decode_path=decode_path,
3445                     offset=offset,
3446                 )
3447             v, tail = v[:l], v[l:]
3448             bit_len = (len(v) - 1) * 8 - pad_size
3449             obj = self.__class__(
3450                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3451                 impl=self.tag,
3452                 expl=self._expl,
3453                 default=self.default,
3454                 optional=self.optional,
3455                 _specs=self.specs,
3456                 _decoded=(offset, llen, l),
3457             )
3458             if evgen_mode:
3459                 obj._value = (bit_len, None)
3460             yield decode_path, obj, tail
3461             return
3462         if t != self.tag_constructed:
3463             raise TagMismatch(
3464                 klass=self.__class__,
3465                 decode_path=decode_path,
3466                 offset=offset,
3467             )
3468         if not ctx.get("bered", False):
3469             raise DecodeError(
3470                 "unallowed BER constructed encoding",
3471                 klass=self.__class__,
3472                 decode_path=decode_path,
3473                 offset=offset,
3474             )
3475         if tag_only:  # pragma: no cover
3476             yield None
3477             return
3478         lenindef = False
3479         try:
3480             l, llen, v = len_decode(lv)
3481         except LenIndefForm:
3482             llen, l, v = 1, 0, lv[1:]
3483             lenindef = True
3484         except DecodeError as err:
3485             raise err.__class__(
3486                 msg=err.msg,
3487                 klass=self.__class__,
3488                 decode_path=decode_path,
3489                 offset=offset,
3490             )
3491         if l > len(v):
3492             raise NotEnoughData(
3493                 "encoded length is longer than data",
3494                 klass=self.__class__,
3495                 decode_path=decode_path,
3496                 offset=offset,
3497             )
3498         if not lenindef and l == 0:
3499             raise NotEnoughData(
3500                 "zero length",
3501                 klass=self.__class__,
3502                 decode_path=decode_path,
3503                 offset=offset,
3504             )
3505         chunks = []
3506         sub_offset = offset + tlen + llen
3507         vlen = 0
3508         while True:
3509             if lenindef:
3510                 if v[:EOC_LEN].tobytes() == EOC:
3511                     break
3512             else:
3513                 if vlen == l:
3514                     break
3515                 if vlen > l:
3516                     raise DecodeError(
3517                         "chunk out of bounds",
3518                         klass=self.__class__,
3519                         decode_path=decode_path + (str(len(chunks) - 1),),
3520                         offset=chunks[-1].offset,
3521                     )
3522             sub_decode_path = decode_path + (str(len(chunks)),)
3523             try:
3524                 if evgen_mode:
3525                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3526                             v,
3527                             offset=sub_offset,
3528                             decode_path=sub_decode_path,
3529                             leavemm=True,
3530                             ctx=ctx,
3531                             _ctx_immutable=False,
3532                     ):
3533                         yield _decode_path, chunk, v_tail
3534                 else:
3535                     _, chunk, v_tail = next(BitString().decode_evgen(
3536                         v,
3537                         offset=sub_offset,
3538                         decode_path=sub_decode_path,
3539                         leavemm=True,
3540                         ctx=ctx,
3541                         _ctx_immutable=False,
3542                         _evgen_mode=False,
3543                     ))
3544             except TagMismatch:
3545                 raise DecodeError(
3546                     "expected BitString encoded chunk",
3547                     klass=self.__class__,
3548                     decode_path=sub_decode_path,
3549                     offset=sub_offset,
3550                 )
3551             chunks.append(chunk)
3552             sub_offset += chunk.tlvlen
3553             vlen += chunk.tlvlen
3554             v = v_tail
3555         if len(chunks) == 0:
3556             raise DecodeError(
3557                 "no chunks",
3558                 klass=self.__class__,
3559                 decode_path=decode_path,
3560                 offset=offset,
3561             )
3562         values = []
3563         bit_len = 0
3564         for chunk_i, chunk in enumerate(chunks[:-1]):
3565             if chunk.bit_len % 8 != 0:
3566                 raise DecodeError(
3567                     "BitString chunk is not multiple of 8 bits",
3568                     klass=self.__class__,
3569                     decode_path=decode_path + (str(chunk_i),),
3570                     offset=chunk.offset,
3571                 )
3572             if not evgen_mode:
3573                 values.append(bytes(chunk))
3574             bit_len += chunk.bit_len
3575         chunk_last = chunks[-1]
3576         if not evgen_mode:
3577             values.append(bytes(chunk_last))
3578         bit_len += chunk_last.bit_len
3579         obj = self.__class__(
3580             value=None if evgen_mode else (bit_len, b"".join(values)),
3581             impl=self.tag,
3582             expl=self._expl,
3583             default=self.default,
3584             optional=self.optional,
3585             _specs=self.specs,
3586             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3587         )
3588         if evgen_mode:
3589             obj._value = (bit_len, None)
3590         obj.lenindef = lenindef
3591         obj.ber_encoded = True
3592         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3593
3594     def __repr__(self):
3595         return pp_console_row(next(self.pps()))
3596
3597     def pps(self, decode_path=()):
3598         value = None
3599         blob = None
3600         if self.ready:
3601             bit_len, blob = self._value
3602             value = "%d bits" % bit_len
3603             if len(self.specs) > 0 and blob is not None:
3604                 blob = tuple(self.named)
3605         yield _pp(
3606             obj=self,
3607             asn1_type_name=self.asn1_type_name,
3608             obj_name=self.__class__.__name__,
3609             decode_path=decode_path,
3610             value=value,
3611             blob=blob,
3612             optional=self.optional,
3613             default=self == self.default,
3614             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3615             expl=None if self._expl is None else tag_decode(self._expl),
3616             offset=self.offset,
3617             tlen=self.tlen,
3618             llen=self.llen,
3619             vlen=self.vlen,
3620             expl_offset=self.expl_offset if self.expled else None,
3621             expl_tlen=self.expl_tlen if self.expled else None,
3622             expl_llen=self.expl_llen if self.expled else None,
3623             expl_vlen=self.expl_vlen if self.expled else None,
3624             expl_lenindef=self.expl_lenindef,
3625             lenindef=self.lenindef,
3626             ber_encoded=self.ber_encoded,
3627             bered=self.bered,
3628         )
3629         defined_by, defined = self.defined or (None, None)
3630         if defined_by is not None:
3631             yield defined.pps(
3632                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3633             )
3634         for pp in self.pps_lenindef(decode_path):
3635             yield pp
3636
3637
3638 OctetStringState = namedtuple(
3639     "OctetStringState",
3640     BasicState._fields + (
3641         "value",
3642         "bound_min",
3643         "bound_max",
3644         "tag_constructed",
3645         "defined",
3646     ),
3647     **NAMEDTUPLE_KWARGS
3648 )
3649
3650
3651 class OctetString(Obj):
3652     """``OCTET STRING`` binary string type
3653
3654     >>> s = OctetString(b"hello world")
3655     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3656     >>> s == OctetString(b"hello world")
3657     True
3658     >>> bytes(s)
3659     b'hello world'
3660
3661     >>> OctetString(b"hello", bounds=(4, 4))
3662     Traceback (most recent call last):
3663     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3664     >>> OctetString(b"hell", bounds=(4, 4))
3665     OCTET STRING 4 bytes 68656c6c
3666
3667     Memoryviews can be used as a values. If memoryview is made on
3668     mmap-ed file, then it does not take storage inside OctetString
3669     itself. In CER encoding mode it will be streamed to the specified
3670     writer, copying 1 KB chunks.
3671     """
3672     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3673     tag_default = tag_encode(4)
3674     asn1_type_name = "OCTET STRING"
3675     evgen_mode_skip_value = True
3676
3677     def __init__(
3678             self,
3679             value=None,
3680             bounds=None,
3681             impl=None,
3682             expl=None,
3683             default=None,
3684             optional=False,
3685             _decoded=(0, 0, 0),
3686             ctx=None,
3687     ):
3688         """
3689         :param value: set the value. Either binary type, or
3690                       :py:class:`pyderasn.OctetString` object
3691         :param bounds: set ``(MIN, MAX)`` value size constraint.
3692                        (-inf, +inf) by default
3693         :param bytes impl: override default tag with ``IMPLICIT`` one
3694         :param bytes expl: override default tag with ``EXPLICIT`` one
3695         :param default: set default value. Type same as in ``value``
3696         :param bool optional: is object ``OPTIONAL`` in sequence
3697         """
3698         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3699         self._value = value
3700         self._bound_min, self._bound_max = getattr(
3701             self,
3702             "bounds",
3703             (0, float("+inf")),
3704         ) if bounds is None else bounds
3705         if value is not None:
3706             self._value = self._value_sanitize(value)
3707         if default is not None:
3708             default = self._value_sanitize(default)
3709             self.default = self.__class__(
3710                 value=default,
3711                 impl=self.tag,
3712                 expl=self._expl,
3713             )
3714             if self._value is None:
3715                 self._value = default
3716         self.defined = None
3717         tag_klass, _, tag_num = tag_decode(self.tag)
3718         self.tag_constructed = tag_encode(
3719             klass=tag_klass,
3720             form=TagFormConstructed,
3721             num=tag_num,
3722         )
3723
3724     def _value_sanitize(self, value):
3725         if value.__class__ == binary_type or value.__class__ == memoryview:
3726             pass
3727         elif issubclass(value.__class__, OctetString):
3728             value = value._value
3729         else:
3730             raise InvalidValueType((self.__class__, bytes, memoryview))
3731         if not self._bound_min <= len(value) <= self._bound_max:
3732             raise BoundsError(self._bound_min, len(value), self._bound_max)
3733         return value
3734
3735     @property
3736     def ready(self):
3737         return self._value is not None
3738
3739     def __getstate__(self):
3740         return OctetStringState(
3741             __version__,
3742             self.tag,
3743             self._tag_order,
3744             self._expl,
3745             self.default,
3746             self.optional,
3747             self.offset,
3748             self.llen,
3749             self.vlen,
3750             self.expl_lenindef,
3751             self.lenindef,
3752             self.ber_encoded,
3753             self._value,
3754             self._bound_min,
3755             self._bound_max,
3756             self.tag_constructed,
3757             self.defined,
3758         )
3759
3760     def __setstate__(self, state):
3761         super(OctetString, self).__setstate__(state)
3762         self._value = state.value
3763         self._bound_min = state.bound_min
3764         self._bound_max = state.bound_max
3765         self.tag_constructed = state.tag_constructed
3766         self.defined = state.defined
3767
3768     def __bytes__(self):
3769         self._assert_ready()
3770         return bytes(self._value)
3771
3772     def __eq__(self, their):
3773         if their.__class__ == binary_type:
3774             return self._value == their
3775         if not issubclass(their.__class__, OctetString):
3776             return False
3777         return (
3778             self._value == their._value and
3779             self.tag == their.tag and
3780             self._expl == their._expl
3781         )
3782
3783     def __lt__(self, their):
3784         return self._value < their._value
3785
3786     def __call__(
3787             self,
3788             value=None,
3789             bounds=None,
3790             impl=None,
3791             expl=None,
3792             default=None,
3793             optional=None,
3794     ):
3795         return self.__class__(
3796             value=value,
3797             bounds=(
3798                 (self._bound_min, self._bound_max)
3799                 if bounds is None else bounds
3800             ),
3801             impl=self.tag if impl is None else impl,
3802             expl=self._expl if expl is None else expl,
3803             default=self.default if default is None else default,
3804             optional=self.optional if optional is None else optional,
3805         )
3806
3807     def _encode(self):
3808         self._assert_ready()
3809         return b"".join((
3810             self.tag,
3811             len_encode(len(self._value)),
3812             self._value,
3813         ))
3814
3815     def _encode1st(self, state):
3816         self._assert_ready()
3817         l = len(self._value)
3818         return len(self.tag) + len_size(l) + l, state
3819
3820     def _encode2nd(self, writer, state_iter):
3821         value = self._value
3822         write_full(writer, self.tag + len_encode(len(value)))
3823         write_full(writer, value)
3824
3825     def _encode_cer(self, writer):
3826         octets = self._value
3827         if len(octets) <= 1000:
3828             write_full(writer, self._encode())
3829             return
3830         write_full(writer, self.tag_constructed)
3831         write_full(writer, LENINDEF)
3832         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3833             write_full(writer, b"".join((
3834                 OctetString.tag_default,
3835                 LEN1K,
3836                 octets[offset:offset + 1000],
3837             )))
3838         tail = octets[offset + 1000:]
3839         if len(tail) > 0:
3840             write_full(writer, b"".join((
3841                 OctetString.tag_default,
3842                 len_encode(len(tail)),
3843                 tail,
3844             )))
3845         write_full(writer, EOC)
3846
3847     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3848         try:
3849             t, tlen, lv = tag_strip(tlv)
3850         except DecodeError as err:
3851             raise err.__class__(
3852                 msg=err.msg,
3853                 klass=self.__class__,
3854                 decode_path=decode_path,
3855                 offset=offset,
3856             )
3857         if t == self.tag:
3858             if tag_only:
3859                 yield None
3860                 return
3861             try:
3862                 l, llen, v = len_decode(lv)
3863             except DecodeError as err:
3864                 raise err.__class__(
3865                     msg=err.msg,
3866                     klass=self.__class__,
3867                     decode_path=decode_path,
3868                     offset=offset,
3869                 )
3870             if l > len(v):
3871                 raise NotEnoughData(
3872                     "encoded length is longer than data",
3873                     klass=self.__class__,
3874                     decode_path=decode_path,
3875                     offset=offset,
3876                 )
3877             v, tail = v[:l], v[l:]
3878             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3879                 raise DecodeError(
3880                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3881                     klass=self.__class__,
3882                     decode_path=decode_path,
3883                     offset=offset,
3884                 )
3885             try:
3886                 obj = self.__class__(
3887                     value=(
3888                         None if (evgen_mode and self.evgen_mode_skip_value)
3889                         else v.tobytes()
3890                     ),
3891                     bounds=(self._bound_min, self._bound_max),
3892                     impl=self.tag,
3893                     expl=self._expl,
3894                     default=self.default,
3895                     optional=self.optional,
3896                     _decoded=(offset, llen, l),
3897                     ctx=ctx,
3898                 )
3899             except DecodeError as err:
3900                 raise DecodeError(
3901                     msg=err.msg,
3902                     klass=self.__class__,
3903                     decode_path=decode_path,
3904                     offset=offset,
3905                 )
3906             except BoundsError as err:
3907                 raise DecodeError(
3908                     msg=str(err),
3909                     klass=self.__class__,
3910                     decode_path=decode_path,
3911                     offset=offset,
3912                 )
3913             yield decode_path, obj, tail
3914             return
3915         if t != self.tag_constructed:
3916             raise TagMismatch(
3917                 klass=self.__class__,
3918                 decode_path=decode_path,
3919                 offset=offset,
3920             )
3921         if not ctx.get("bered", False):
3922             raise DecodeError(
3923                 "unallowed BER constructed encoding",
3924                 klass=self.__class__,
3925                 decode_path=decode_path,
3926                 offset=offset,
3927             )
3928         if tag_only:
3929             yield None
3930             return
3931         lenindef = False
3932         try:
3933             l, llen, v = len_decode(lv)
3934         except LenIndefForm:
3935             llen, l, v = 1, 0, lv[1:]
3936             lenindef = True
3937         except DecodeError as err:
3938             raise err.__class__(
3939                 msg=err.msg,
3940                 klass=self.__class__,
3941                 decode_path=decode_path,
3942                 offset=offset,
3943             )
3944         if l > len(v):
3945             raise NotEnoughData(
3946                 "encoded length is longer than data",
3947                 klass=self.__class__,
3948                 decode_path=decode_path,
3949                 offset=offset,
3950             )
3951         chunks = []
3952         chunks_count = 0
3953         sub_offset = offset + tlen + llen
3954         vlen = 0
3955         payload_len = 0
3956         while True:
3957             if lenindef:
3958                 if v[:EOC_LEN].tobytes() == EOC:
3959                     break
3960             else:
3961                 if vlen == l:
3962                     break
3963                 if vlen > l:
3964                     raise DecodeError(
3965                         "chunk out of bounds",
3966                         klass=self.__class__,
3967                         decode_path=decode_path + (str(len(chunks) - 1),),
3968                         offset=chunks[-1].offset,
3969                     )
3970             try:
3971                 if evgen_mode:
3972                     sub_decode_path = decode_path + (str(chunks_count),)
3973                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3974                             v,
3975                             offset=sub_offset,
3976                             decode_path=sub_decode_path,
3977                             leavemm=True,
3978                             ctx=ctx,
3979                             _ctx_immutable=False,
3980                     ):
3981                         yield _decode_path, chunk, v_tail
3982                         if not chunk.ber_encoded:
3983                             payload_len += chunk.vlen
3984                     chunks_count += 1
3985                 else:
3986                     sub_decode_path = decode_path + (str(len(chunks)),)
3987                     _, chunk, v_tail = next(OctetString().decode_evgen(
3988                         v,
3989                         offset=sub_offset,
3990                         decode_path=sub_decode_path,
3991                         leavemm=True,
3992                         ctx=ctx,
3993                         _ctx_immutable=False,
3994                         _evgen_mode=False,
3995                     ))
3996                     chunks.append(chunk)
3997             except TagMismatch:
3998                 raise DecodeError(
3999                     "expected OctetString encoded chunk",
4000                     klass=self.__class__,
4001                     decode_path=sub_decode_path,
4002                     offset=sub_offset,
4003                 )
4004             sub_offset += chunk.tlvlen
4005             vlen += chunk.tlvlen
4006             v = v_tail
4007         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4008             raise DecodeError(
4009                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4010                 klass=self.__class__,
4011                 decode_path=decode_path,
4012                 offset=offset,
4013             )
4014         try:
4015             obj = self.__class__(
4016                 value=(
4017                     None if evgen_mode else
4018                     b"".join(bytes(chunk) for chunk in chunks)
4019                 ),
4020                 bounds=(self._bound_min, self._bound_max),
4021                 impl=self.tag,
4022                 expl=self._expl,
4023                 default=self.default,
4024                 optional=self.optional,
4025                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4026                 ctx=ctx,
4027             )
4028         except DecodeError as err:
4029             raise DecodeError(
4030                 msg=err.msg,
4031                 klass=self.__class__,
4032                 decode_path=decode_path,
4033                 offset=offset,
4034             )
4035         except BoundsError as err:
4036             raise DecodeError(
4037                 msg=str(err),
4038                 klass=self.__class__,
4039                 decode_path=decode_path,
4040                 offset=offset,
4041             )
4042         obj.lenindef = lenindef
4043         obj.ber_encoded = True
4044         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4045
4046     def __repr__(self):
4047         return pp_console_row(next(self.pps()))
4048
4049     def pps(self, decode_path=()):
4050         yield _pp(
4051             obj=self,
4052             asn1_type_name=self.asn1_type_name,
4053             obj_name=self.__class__.__name__,
4054             decode_path=decode_path,
4055             value=("%d bytes" % len(self._value)) if self.ready else None,
4056             blob=self._value if self.ready else None,
4057             optional=self.optional,
4058             default=self == self.default,
4059             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4060             expl=None if self._expl is None else tag_decode(self._expl),
4061             offset=self.offset,
4062             tlen=self.tlen,
4063             llen=self.llen,
4064             vlen=self.vlen,
4065             expl_offset=self.expl_offset if self.expled else None,
4066             expl_tlen=self.expl_tlen if self.expled else None,
4067             expl_llen=self.expl_llen if self.expled else None,
4068             expl_vlen=self.expl_vlen if self.expled else None,
4069             expl_lenindef=self.expl_lenindef,
4070             lenindef=self.lenindef,
4071             ber_encoded=self.ber_encoded,
4072             bered=self.bered,
4073         )
4074         defined_by, defined = self.defined or (None, None)
4075         if defined_by is not None:
4076             yield defined.pps(
4077                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4078             )
4079         for pp in self.pps_lenindef(decode_path):
4080             yield pp
4081
4082
4083 def agg_octet_string(evgens, decode_path, raw, writer):
4084     """Aggregate constructed string (OctetString and its derivatives)
4085
4086     :param evgens: iterator of generated events
4087     :param decode_path: points to the string we want to decode
4088     :param raw: slicebable (memoryview, bytearray, etc) with
4089                 the data evgens are generated on
4090     :param writer: buffer.write where string is going to be saved
4091     :param writer: where string is going to be saved. Must comply
4092                    with ``io.RawIOBase.write`` behaviour
4093
4094     .. seealso:: :ref:`agg_octet_string`
4095     """
4096     decode_path_len = len(decode_path)
4097     for dp, obj, _ in evgens:
4098         if dp[:decode_path_len] != decode_path:
4099             continue
4100         if not obj.ber_encoded:
4101             write_full(writer, raw[
4102                 obj.offset + obj.tlen + obj.llen:
4103                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4104                 (EOC_LEN if obj.expl_lenindef else 0)
4105             ])
4106         if len(dp) == decode_path_len:
4107             break
4108
4109
4110 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4111
4112
4113 class Null(Obj):
4114     """``NULL`` null object
4115
4116     >>> n = Null()
4117     NULL
4118     >>> n.ready
4119     True
4120     """
4121     __slots__ = ()
4122     tag_default = tag_encode(5)
4123     asn1_type_name = "NULL"
4124
4125     def __init__(
4126             self,
4127             value=None,  # unused, but Sequence passes it
4128             impl=None,
4129             expl=None,
4130             optional=False,
4131             _decoded=(0, 0, 0),
4132     ):
4133         """
4134         :param bytes impl: override default tag with ``IMPLICIT`` one
4135         :param bytes expl: override default tag with ``EXPLICIT`` one
4136         :param bool optional: is object ``OPTIONAL`` in sequence
4137         """
4138         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4139         self.default = None
4140
4141     @property
4142     def ready(self):
4143         return True
4144
4145     def __getstate__(self):
4146         return NullState(
4147             __version__,
4148             self.tag,
4149             self._tag_order,
4150             self._expl,
4151             self.default,
4152             self.optional,
4153             self.offset,
4154             self.llen,
4155             self.vlen,
4156             self.expl_lenindef,
4157             self.lenindef,
4158             self.ber_encoded,
4159         )
4160
4161     def __eq__(self, their):
4162         if not issubclass(their.__class__, Null):
4163             return False
4164         return (
4165             self.tag == their.tag and
4166             self._expl == their._expl
4167         )
4168
4169     def __call__(
4170             self,
4171             value=None,
4172             impl=None,
4173             expl=None,
4174             optional=None,
4175     ):
4176         return self.__class__(
4177             impl=self.tag if impl is None else impl,
4178             expl=self._expl if expl is None else expl,
4179             optional=self.optional if optional is None else optional,
4180         )
4181
4182     def _encode(self):
4183         return self.tag + LEN0
4184
4185     def _encode1st(self, state):
4186         return len(self.tag) + 1, state
4187
4188     def _encode2nd(self, writer, state_iter):
4189         write_full(writer, self.tag + LEN0)
4190
4191     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4192         try:
4193             t, _, lv = tag_strip(tlv)
4194         except DecodeError as err:
4195             raise err.__class__(
4196                 msg=err.msg,
4197                 klass=self.__class__,
4198                 decode_path=decode_path,
4199                 offset=offset,
4200             )
4201         if t != self.tag:
4202             raise TagMismatch(
4203                 klass=self.__class__,
4204                 decode_path=decode_path,
4205                 offset=offset,
4206             )
4207         if tag_only:  # pragma: no cover
4208             yield None
4209             return
4210         try:
4211             l, _, v = len_decode(lv)
4212         except DecodeError as err:
4213             raise err.__class__(
4214                 msg=err.msg,
4215                 klass=self.__class__,
4216                 decode_path=decode_path,
4217                 offset=offset,
4218             )
4219         if l != 0:
4220             raise InvalidLength(
4221                 "Null must have zero length",
4222                 klass=self.__class__,
4223                 decode_path=decode_path,
4224                 offset=offset,
4225             )
4226         obj = self.__class__(
4227             impl=self.tag,
4228             expl=self._expl,
4229             optional=self.optional,
4230             _decoded=(offset, 1, 0),
4231         )
4232         yield decode_path, obj, v
4233
4234     def __repr__(self):
4235         return pp_console_row(next(self.pps()))
4236
4237     def pps(self, decode_path=()):
4238         yield _pp(
4239             obj=self,
4240             asn1_type_name=self.asn1_type_name,
4241             obj_name=self.__class__.__name__,
4242             decode_path=decode_path,
4243             optional=self.optional,
4244             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4245             expl=None if self._expl is None else tag_decode(self._expl),
4246             offset=self.offset,
4247             tlen=self.tlen,
4248             llen=self.llen,
4249             vlen=self.vlen,
4250             expl_offset=self.expl_offset if self.expled else None,
4251             expl_tlen=self.expl_tlen if self.expled else None,
4252             expl_llen=self.expl_llen if self.expled else None,
4253             expl_vlen=self.expl_vlen if self.expled else None,
4254             expl_lenindef=self.expl_lenindef,
4255             bered=self.bered,
4256         )
4257         for pp in self.pps_lenindef(decode_path):
4258             yield pp
4259
4260
4261 ObjectIdentifierState = namedtuple(
4262     "ObjectIdentifierState",
4263     BasicState._fields + ("value", "defines"),
4264     **NAMEDTUPLE_KWARGS
4265 )
4266
4267
4268 class ObjectIdentifier(Obj):
4269     """``OBJECT IDENTIFIER`` OID type
4270
4271     >>> oid = ObjectIdentifier((1, 2, 3))
4272     OBJECT IDENTIFIER 1.2.3
4273     >>> oid == ObjectIdentifier("1.2.3")
4274     True
4275     >>> tuple(oid)
4276     (1, 2, 3)
4277     >>> str(oid)
4278     '1.2.3'
4279     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4280     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4281
4282     >>> str(ObjectIdentifier((3, 1)))
4283     Traceback (most recent call last):
4284     pyderasn.InvalidOID: unacceptable first arc value
4285     """
4286     __slots__ = ("defines",)
4287     tag_default = tag_encode(6)
4288     asn1_type_name = "OBJECT IDENTIFIER"
4289
4290     def __init__(
4291             self,
4292             value=None,
4293             defines=(),
4294             impl=None,
4295             expl=None,
4296             default=None,
4297             optional=False,
4298             _decoded=(0, 0, 0),
4299     ):
4300         """
4301         :param value: set the value. Either tuples of integers,
4302                       string of "."-concatenated integers, or
4303                       :py:class:`pyderasn.ObjectIdentifier` object
4304         :param defines: sequence of tuples. Each tuple has two elements.
4305                         First one is relative to current one decode
4306                         path, aiming to the field defined by that OID.
4307                         Read about relative path in
4308                         :py:func:`pyderasn.abs_decode_path`. Second
4309                         tuple element is ``{OID: pyderasn.Obj()}``
4310                         dictionary, mapping between current OID value
4311                         and structure applied to defined field.
4312
4313                         .. seealso:: :ref:`definedby`
4314
4315         :param bytes impl: override default tag with ``IMPLICIT`` one
4316         :param bytes expl: override default tag with ``EXPLICIT`` one
4317         :param default: set default value. Type same as in ``value``
4318         :param bool optional: is object ``OPTIONAL`` in sequence
4319         """
4320         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4321         self._value = value
4322         if value is not None:
4323             self._value = self._value_sanitize(value)
4324         if default is not None:
4325             default = self._value_sanitize(default)
4326             self.default = self.__class__(
4327                 value=default,
4328                 impl=self.tag,
4329                 expl=self._expl,
4330             )
4331             if self._value is None:
4332                 self._value = default
4333         self.defines = defines
4334
4335     def __add__(self, their):
4336         if their.__class__ == tuple:
4337             return self.__class__(self._value + array("L", their))
4338         if isinstance(their, self.__class__):
4339             return self.__class__(self._value + their._value)
4340         raise InvalidValueType((self.__class__, tuple))
4341
4342     def _value_sanitize(self, value):
4343         if issubclass(value.__class__, ObjectIdentifier):
4344             return value._value
4345         if isinstance(value, string_types):
4346             try:
4347                 value = array("L", (pureint(arc) for arc in value.split(".")))
4348             except ValueError:
4349                 raise InvalidOID("unacceptable arcs values")
4350         if value.__class__ == tuple:
4351             try:
4352                 value = array("L", value)
4353             except OverflowError as err:
4354                 raise InvalidOID(repr(err))
4355         if value.__class__ is array:
4356             if len(value) < 2:
4357                 raise InvalidOID("less than 2 arcs")
4358             first_arc = value[0]
4359             if first_arc in (0, 1):
4360                 if not (0 <= value[1] <= 39):
4361                     raise InvalidOID("second arc is too wide")
4362             elif first_arc == 2:
4363                 pass
4364             else:
4365                 raise InvalidOID("unacceptable first arc value")
4366             if not all(arc >= 0 for arc in value):
4367                 raise InvalidOID("negative arc value")
4368             return value
4369         raise InvalidValueType((self.__class__, str, tuple))
4370
4371     @property
4372     def ready(self):
4373         return self._value is not None
4374
4375     def __getstate__(self):
4376         return ObjectIdentifierState(
4377             __version__,
4378             self.tag,
4379             self._tag_order,
4380             self._expl,
4381             self.default,
4382             self.optional,
4383             self.offset,
4384             self.llen,
4385             self.vlen,
4386             self.expl_lenindef,
4387             self.lenindef,
4388             self.ber_encoded,
4389             self._value,
4390             self.defines,
4391         )
4392
4393     def __setstate__(self, state):
4394         super(ObjectIdentifier, self).__setstate__(state)
4395         self._value = state.value
4396         self.defines = state.defines
4397
4398     def __iter__(self):
4399         self._assert_ready()
4400         return iter(self._value)
4401
4402     def __str__(self):
4403         return ".".join(str(arc) for arc in self._value or ())
4404
4405     def __hash__(self):
4406         self._assert_ready()
4407         return hash(b"".join((
4408             self.tag,
4409             bytes(self._expl or b""),
4410             str(self._value).encode("ascii"),
4411         )))
4412
4413     def __eq__(self, their):
4414         if their.__class__ == tuple:
4415             return self._value == array("L", their)
4416         if not issubclass(their.__class__, ObjectIdentifier):
4417             return False
4418         return (
4419             self.tag == their.tag and
4420             self._expl == their._expl and
4421             self._value == their._value
4422         )
4423
4424     def __lt__(self, their):
4425         return self._value < their._value
4426
4427     def __call__(
4428             self,
4429             value=None,
4430             defines=None,
4431             impl=None,
4432             expl=None,
4433             default=None,
4434             optional=None,
4435     ):
4436         return self.__class__(
4437             value=value,
4438             defines=self.defines if defines is None else defines,
4439             impl=self.tag if impl is None else impl,
4440             expl=self._expl if expl is None else expl,
4441             default=self.default if default is None else default,
4442             optional=self.optional if optional is None else optional,
4443         )
4444
4445     def _encode_octets(self):
4446         self._assert_ready()
4447         value = self._value
4448         first_value = value[1]
4449         first_arc = value[0]
4450         if first_arc == 0:
4451             pass
4452         elif first_arc == 1:
4453             first_value += 40
4454         elif first_arc == 2:
4455             first_value += 80
4456         else:  # pragma: no cover
4457             raise RuntimeError("invalid arc is stored")
4458         octets = [zero_ended_encode(first_value)]
4459         for arc in value[2:]:
4460             octets.append(zero_ended_encode(arc))
4461         return b"".join(octets)
4462
4463     def _encode(self):
4464         v = self._encode_octets()
4465         return b"".join((self.tag, len_encode(len(v)), v))
4466
4467     def _encode1st(self, state):
4468         l = len(self._encode_octets())
4469         return len(self.tag) + len_size(l) + l, state
4470
4471     def _encode2nd(self, writer, state_iter):
4472         write_full(writer, self._encode())
4473
4474     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4475         try:
4476             t, _, lv = tag_strip(tlv)
4477         except DecodeError as err:
4478             raise err.__class__(
4479                 msg=err.msg,
4480                 klass=self.__class__,
4481                 decode_path=decode_path,
4482                 offset=offset,
4483             )
4484         if t != self.tag:
4485             raise TagMismatch(
4486                 klass=self.__class__,
4487                 decode_path=decode_path,
4488                 offset=offset,
4489             )
4490         if tag_only:  # pragma: no cover
4491             yield None
4492             return
4493         try:
4494             l, llen, v = len_decode(lv)
4495         except DecodeError as err:
4496             raise err.__class__(
4497                 msg=err.msg,
4498                 klass=self.__class__,
4499                 decode_path=decode_path,
4500                 offset=offset,
4501             )
4502         if l > len(v):
4503             raise NotEnoughData(
4504                 "encoded length is longer than data",
4505                 klass=self.__class__,
4506                 decode_path=decode_path,
4507                 offset=offset,
4508             )
4509         if l == 0:
4510             raise NotEnoughData(
4511                 "zero length",
4512                 klass=self.__class__,
4513                 decode_path=decode_path,
4514                 offset=offset,
4515             )
4516         v, tail = v[:l], v[l:]
4517         arcs = array("L")
4518         ber_encoded = False
4519         while len(v) > 0:
4520             i = 0
4521             arc = 0
4522             while True:
4523                 octet = indexbytes(v, i)
4524                 if i == 0 and octet == 0x80:
4525                     if ctx.get("bered", False):
4526                         ber_encoded = True
4527                     else:
4528                         raise DecodeError(
4529                             "non normalized arc encoding",
4530                             klass=self.__class__,
4531                             decode_path=decode_path,
4532                             offset=offset,
4533                         )
4534                 arc = (arc << 7) | (octet & 0x7F)
4535                 if octet & 0x80 == 0:
4536                     try:
4537                         arcs.append(arc)
4538                     except OverflowError:
4539                         raise DecodeError(
4540                             "too huge value for local unsigned long",
4541                             klass=self.__class__,
4542                             decode_path=decode_path,
4543                             offset=offset,
4544                         )
4545                     v = v[i + 1:]
4546                     break
4547                 i += 1
4548                 if i == len(v):
4549                     raise DecodeError(
4550                         "unfinished OID",
4551                         klass=self.__class__,
4552                         decode_path=decode_path,
4553                         offset=offset,
4554                     )
4555         first_arc = 0
4556         second_arc = arcs[0]
4557         if 0 <= second_arc <= 39:
4558             first_arc = 0
4559         elif 40 <= second_arc <= 79:
4560             first_arc = 1
4561             second_arc -= 40
4562         else:
4563             first_arc = 2
4564             second_arc -= 80
4565         obj = self.__class__(
4566             value=array("L", (first_arc, second_arc)) + arcs[1:],
4567             impl=self.tag,
4568             expl=self._expl,
4569             default=self.default,
4570             optional=self.optional,
4571             _decoded=(offset, llen, l),
4572         )
4573         if ber_encoded:
4574             obj.ber_encoded = True
4575         yield decode_path, obj, tail
4576
4577     def __repr__(self):
4578         return pp_console_row(next(self.pps()))
4579
4580     def pps(self, decode_path=()):
4581         yield _pp(
4582             obj=self,
4583             asn1_type_name=self.asn1_type_name,
4584             obj_name=self.__class__.__name__,
4585             decode_path=decode_path,
4586             value=str(self) if self.ready else None,
4587             optional=self.optional,
4588             default=self == self.default,
4589             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4590             expl=None if self._expl is None else tag_decode(self._expl),
4591             offset=self.offset,
4592             tlen=self.tlen,
4593             llen=self.llen,
4594             vlen=self.vlen,
4595             expl_offset=self.expl_offset if self.expled else None,
4596             expl_tlen=self.expl_tlen if self.expled else None,
4597             expl_llen=self.expl_llen if self.expled else None,
4598             expl_vlen=self.expl_vlen if self.expled else None,
4599             expl_lenindef=self.expl_lenindef,
4600             ber_encoded=self.ber_encoded,
4601             bered=self.bered,
4602         )
4603         for pp in self.pps_lenindef(decode_path):
4604             yield pp
4605
4606
4607 class Enumerated(Integer):
4608     """``ENUMERATED`` integer type
4609
4610     This type is identical to :py:class:`pyderasn.Integer`, but requires
4611     schema to be specified and does not accept values missing from it.
4612     """
4613     __slots__ = ()
4614     tag_default = tag_encode(10)
4615     asn1_type_name = "ENUMERATED"
4616
4617     def __init__(
4618             self,
4619             value=None,
4620             impl=None,
4621             expl=None,
4622             default=None,
4623             optional=False,
4624             _specs=None,
4625             _decoded=(0, 0, 0),
4626             bounds=None,  # dummy argument, workability for Integer.decode
4627     ):
4628         super(Enumerated, self).__init__(
4629             value, bounds, impl, expl, default, optional, _specs, _decoded,
4630         )
4631         if len(self.specs) == 0:
4632             raise ValueError("schema must be specified")
4633
4634     def _value_sanitize(self, value):
4635         if isinstance(value, self.__class__):
4636             value = value._value
4637         elif isinstance(value, integer_types):
4638             for _value in itervalues(self.specs):
4639                 if _value == value:
4640                     break
4641             else:
4642                 raise DecodeError(
4643                     "unknown integer value: %s" % value,
4644                     klass=self.__class__,
4645                 )
4646         elif isinstance(value, string_types):
4647             value = self.specs.get(value)
4648             if value is None:
4649                 raise ObjUnknown("integer value: %s" % value)
4650         else:
4651             raise InvalidValueType((self.__class__, int, str))
4652         return value
4653
4654     def __call__(
4655             self,
4656             value=None,
4657             impl=None,
4658             expl=None,
4659             default=None,
4660             optional=None,
4661             _specs=None,
4662     ):
4663         return self.__class__(
4664             value=value,
4665             impl=self.tag if impl is None else impl,
4666             expl=self._expl if expl is None else expl,
4667             default=self.default if default is None else default,
4668             optional=self.optional if optional is None else optional,
4669             _specs=self.specs,
4670         )
4671
4672
4673 def escape_control_unicode(c):
4674     if unicat(c)[0] == "C":
4675         c = repr(c).lstrip("u").strip("'")
4676     return c
4677
4678
4679 class CommonString(OctetString):
4680     """Common class for all strings
4681
4682     Everything resembles :py:class:`pyderasn.OctetString`, except
4683     ability to deal with unicode text strings.
4684
4685     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4686     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4687     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4688     True
4689     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4690     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4691     >>> str(s)
4692     'привет Ð¼Ð¸Ñ€'
4693     >>> hexenc(bytes(s))
4694     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4695
4696     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4697     Traceback (most recent call last):
4698     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4699
4700     >>> BMPString("ада", bounds=(2, 2))
4701     Traceback (most recent call last):
4702     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4703     >>> s = BMPString("ад", bounds=(2, 2))
4704     >>> s.encoding
4705     'utf-16-be'
4706     >>> hexenc(bytes(s))
4707     '04300434'
4708
4709     .. list-table::
4710        :header-rows: 1
4711
4712        * - Class
4713          - Text Encoding
4714        * - :py:class:`pyderasn.UTF8String`
4715          - utf-8
4716        * - :py:class:`pyderasn.NumericString`
4717          - ascii
4718        * - :py:class:`pyderasn.PrintableString`
4719          - ascii
4720        * - :py:class:`pyderasn.TeletexString`
4721          - ascii
4722        * - :py:class:`pyderasn.T61String`
4723          - ascii
4724        * - :py:class:`pyderasn.VideotexString`
4725          - iso-8859-1
4726        * - :py:class:`pyderasn.IA5String`
4727          - ascii
4728        * - :py:class:`pyderasn.GraphicString`
4729          - iso-8859-1
4730        * - :py:class:`pyderasn.VisibleString`
4731          - ascii
4732        * - :py:class:`pyderasn.ISO646String`
4733          - ascii
4734        * - :py:class:`pyderasn.GeneralString`
4735          - iso-8859-1
4736        * - :py:class:`pyderasn.UniversalString`
4737          - utf-32-be
4738        * - :py:class:`pyderasn.BMPString`
4739          - utf-16-be
4740     """
4741     __slots__ = ()
4742
4743     def _value_sanitize(self, value):
4744         value_raw = None
4745         value_decoded = None
4746         if isinstance(value, self.__class__):
4747             value_raw = value._value
4748         elif value.__class__ == text_type:
4749             value_decoded = value
4750         elif value.__class__ == binary_type:
4751             value_raw = value
4752         else:
4753             raise InvalidValueType((self.__class__, text_type, binary_type))
4754         try:
4755             value_raw = (
4756                 value_decoded.encode(self.encoding)
4757                 if value_raw is None else value_raw
4758             )
4759             value_decoded = (
4760                 value_raw.decode(self.encoding)
4761                 if value_decoded is None else value_decoded
4762             )
4763         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4764             raise DecodeError(str(err))
4765         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4766             raise BoundsError(
4767                 self._bound_min,
4768                 len(value_decoded),
4769                 self._bound_max,
4770             )
4771         return value_raw
4772
4773     def __eq__(self, their):
4774         if their.__class__ == binary_type:
4775             return self._value == their
4776         if their.__class__ == text_type:
4777             return self._value == their.encode(self.encoding)
4778         if not isinstance(their, self.__class__):
4779             return False
4780         return (
4781             self._value == their._value and
4782             self.tag == their.tag and
4783             self._expl == their._expl
4784         )
4785
4786     def __unicode__(self):
4787         if self.ready:
4788             return self._value.decode(self.encoding)
4789         return text_type(self._value)
4790
4791     def __repr__(self):
4792         return pp_console_row(next(self.pps(no_unicode=PY2)))
4793
4794     def pps(self, decode_path=(), no_unicode=False):
4795         value = None
4796         if self.ready:
4797             value = (
4798                 hexenc(bytes(self)) if no_unicode else
4799                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4800             )
4801         yield _pp(
4802             obj=self,
4803             asn1_type_name=self.asn1_type_name,
4804             obj_name=self.__class__.__name__,
4805             decode_path=decode_path,
4806             value=value,
4807             optional=self.optional,
4808             default=self == self.default,
4809             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4810             expl=None if self._expl is None else tag_decode(self._expl),
4811             offset=self.offset,
4812             tlen=self.tlen,
4813             llen=self.llen,
4814             vlen=self.vlen,
4815             expl_offset=self.expl_offset if self.expled else None,
4816             expl_tlen=self.expl_tlen if self.expled else None,
4817             expl_llen=self.expl_llen if self.expled else None,
4818             expl_vlen=self.expl_vlen if self.expled else None,
4819             expl_lenindef=self.expl_lenindef,
4820             ber_encoded=self.ber_encoded,
4821             bered=self.bered,
4822         )
4823         for pp in self.pps_lenindef(decode_path):
4824             yield pp
4825
4826
4827 class UTF8String(CommonString):
4828     __slots__ = ()
4829     tag_default = tag_encode(12)
4830     encoding = "utf-8"
4831     asn1_type_name = "UTF8String"
4832
4833
4834 class AllowableCharsMixin(object):
4835     @property
4836     def allowable_chars(self):
4837         if PY2:
4838             return self._allowable_chars
4839         return frozenset(six_unichr(c) for c in self._allowable_chars)
4840
4841
4842 class NumericString(AllowableCharsMixin, CommonString):
4843     """Numeric string
4844
4845     Its value is properly sanitized: only ASCII digits with spaces can
4846     be stored.
4847
4848     >>> NumericString().allowable_chars
4849     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4850     """
4851     __slots__ = ()
4852     tag_default = tag_encode(18)
4853     encoding = "ascii"
4854     asn1_type_name = "NumericString"
4855     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4856
4857     def _value_sanitize(self, value):
4858         value = super(NumericString, self)._value_sanitize(value)
4859         if not frozenset(value) <= self._allowable_chars:
4860             raise DecodeError("non-numeric value")
4861         return value
4862
4863
4864 PrintableStringState = namedtuple(
4865     "PrintableStringState",
4866     OctetStringState._fields + ("allowable_chars",),
4867     **NAMEDTUPLE_KWARGS
4868 )
4869
4870
4871 class PrintableString(AllowableCharsMixin, CommonString):
4872     """Printable string
4873
4874     Its value is properly sanitized: see X.680 41.4 table 10.
4875
4876     >>> PrintableString().allowable_chars
4877     frozenset([' ', "'", ..., 'z'])
4878     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4879     PrintableString PrintableString foo*bar
4880     >>> obj.allow_asterisk, obj.allow_ampersand
4881     (True, False)
4882     """
4883     __slots__ = ()
4884     tag_default = tag_encode(19)
4885     encoding = "ascii"
4886     asn1_type_name = "PrintableString"
4887     _allowable_chars = frozenset(
4888         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4889     )
4890     _asterisk = frozenset("*".encode("ascii"))
4891     _ampersand = frozenset("&".encode("ascii"))
4892
4893     def __init__(
4894             self,
4895             value=None,
4896             bounds=None,
4897             impl=None,
4898             expl=None,
4899             default=None,
4900             optional=False,
4901             _decoded=(0, 0, 0),
4902             ctx=None,
4903             allow_asterisk=False,
4904             allow_ampersand=False,
4905     ):
4906         """
4907         :param allow_asterisk: allow asterisk character
4908         :param allow_ampersand: allow ampersand character
4909         """
4910         if allow_asterisk:
4911             self._allowable_chars |= self._asterisk
4912         if allow_ampersand:
4913             self._allowable_chars |= self._ampersand
4914         super(PrintableString, self).__init__(
4915             value, bounds, impl, expl, default, optional, _decoded, ctx,
4916         )
4917
4918     @property
4919     def allow_asterisk(self):
4920         """Is asterisk character allowed?
4921         """
4922         return self._asterisk <= self._allowable_chars
4923
4924     @property
4925     def allow_ampersand(self):
4926         """Is ampersand character allowed?
4927         """
4928         return self._ampersand <= self._allowable_chars
4929
4930     def _value_sanitize(self, value):
4931         value = super(PrintableString, self)._value_sanitize(value)
4932         if not frozenset(value) <= self._allowable_chars:
4933             raise DecodeError("non-printable value")
4934         return value
4935
4936     def __getstate__(self):
4937         return PrintableStringState(
4938             *super(PrintableString, self).__getstate__(),
4939             **{"allowable_chars": self._allowable_chars}
4940         )
4941
4942     def __setstate__(self, state):
4943         super(PrintableString, self).__setstate__(state)
4944         self._allowable_chars = state.allowable_chars
4945
4946     def __call__(
4947             self,
4948             value=None,
4949             bounds=None,
4950             impl=None,
4951             expl=None,
4952             default=None,
4953             optional=None,
4954     ):
4955         return self.__class__(
4956             value=value,
4957             bounds=(
4958                 (self._bound_min, self._bound_max)
4959                 if bounds is None else bounds
4960             ),
4961             impl=self.tag if impl is None else impl,
4962             expl=self._expl if expl is None else expl,
4963             default=self.default if default is None else default,
4964             optional=self.optional if optional is None else optional,
4965             allow_asterisk=self.allow_asterisk,
4966             allow_ampersand=self.allow_ampersand,
4967         )
4968
4969
4970 class TeletexString(CommonString):
4971     __slots__ = ()
4972     tag_default = tag_encode(20)
4973     encoding = "ascii"
4974     asn1_type_name = "TeletexString"
4975
4976
4977 class T61String(TeletexString):
4978     __slots__ = ()
4979     asn1_type_name = "T61String"
4980
4981
4982 class VideotexString(CommonString):
4983     __slots__ = ()
4984     tag_default = tag_encode(21)
4985     encoding = "iso-8859-1"
4986     asn1_type_name = "VideotexString"
4987
4988
4989 class IA5String(CommonString):
4990     __slots__ = ()
4991     tag_default = tag_encode(22)
4992     encoding = "ascii"
4993     asn1_type_name = "IA5"
4994
4995
4996 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4997 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4998 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4999 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5000 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5001 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5002
5003
5004 class VisibleString(CommonString):
5005     __slots__ = ()
5006     tag_default = tag_encode(26)
5007     encoding = "ascii"
5008     asn1_type_name = "VisibleString"
5009
5010
5011 UTCTimeState = namedtuple(
5012     "UTCTimeState",
5013     OctetStringState._fields + ("ber_raw",),
5014     **NAMEDTUPLE_KWARGS
5015 )
5016
5017
5018 def str_to_time_fractions(value):
5019     v = pureint(value)
5020     year, v = (v // 10**10), (v % 10**10)
5021     month, v = (v // 10**8), (v % 10**8)
5022     day, v = (v // 10**6), (v % 10**6)
5023     hour, v = (v // 10**4), (v % 10**4)
5024     minute, second = (v // 100), (v % 100)
5025     return year, month, day, hour, minute, second
5026
5027
5028 class UTCTime(VisibleString):
5029     """``UTCTime`` datetime type
5030
5031     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5032     UTCTime UTCTime 2017-09-30T22:07:50
5033     >>> str(t)
5034     '170930220750Z'
5035     >>> bytes(t)
5036     b'170930220750Z'
5037     >>> t.todatetime()
5038     datetime.datetime(2017, 9, 30, 22, 7, 50)
5039     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5040     datetime.datetime(1957, 9, 30, 22, 7, 50)
5041
5042     If BER encoded value was met, then ``ber_raw`` attribute will hold
5043     its raw representation.
5044
5045     .. warning::
5046
5047        Pay attention that UTCTime can not hold full year, so all years
5048        having < 50 years are treated as 20xx, 19xx otherwise, according
5049        to X.509 recommendation.
5050
5051     .. warning::
5052
5053        No strict validation of UTC offsets are made, but very crude:
5054
5055        * minutes are not exceeding 60
5056        * offset value is not exceeding 14 hours
5057     """
5058     __slots__ = ("ber_raw",)
5059     tag_default = tag_encode(23)
5060     encoding = "ascii"
5061     asn1_type_name = "UTCTime"
5062     evgen_mode_skip_value = False
5063
5064     def __init__(
5065             self,
5066             value=None,
5067             impl=None,
5068             expl=None,
5069             default=None,
5070             optional=False,
5071             _decoded=(0, 0, 0),
5072             bounds=None,  # dummy argument, workability for OctetString.decode
5073             ctx=None,
5074     ):
5075         """
5076         :param value: set the value. Either datetime type, or
5077                       :py:class:`pyderasn.UTCTime` object
5078         :param bytes impl: override default tag with ``IMPLICIT`` one
5079         :param bytes expl: override default tag with ``EXPLICIT`` one
5080         :param default: set default value. Type same as in ``value``
5081         :param bool optional: is object ``OPTIONAL`` in sequence
5082         """
5083         super(UTCTime, self).__init__(
5084             None, None, impl, expl, None, optional, _decoded, ctx,
5085         )
5086         self._value = value
5087         self.ber_raw = None
5088         if value is not None:
5089             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5090             self.ber_encoded = self.ber_raw is not None
5091         if default is not None:
5092             default, _ = self._value_sanitize(default)
5093             self.default = self.__class__(
5094                 value=default,
5095                 impl=self.tag,
5096                 expl=self._expl,
5097             )
5098             if self._value is None:
5099                 self._value = default
5100             optional = True
5101         self.optional = optional
5102
5103     def _strptime_bered(self, value):
5104         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5105         value = value[10:]
5106         if len(value) == 0:
5107             raise ValueError("no timezone")
5108         year += 2000 if year < 50 else 1900
5109         decoded = datetime(year, month, day, hour, minute)
5110         offset = 0
5111         if value[-1] == "Z":
5112             value = value[:-1]
5113         else:
5114             if len(value) < 5:
5115                 raise ValueError("invalid UTC offset")
5116             if value[-5] == "-":
5117                 sign = -1
5118             elif value[-5] == "+":
5119                 sign = 1
5120             else:
5121                 raise ValueError("invalid UTC offset")
5122             v = pureint(value[-4:])
5123             offset, v = (60 * (v % 100)), v // 100
5124             if offset >= 3600:
5125                 raise ValueError("invalid UTC offset minutes")
5126             offset += 3600 * v
5127             if offset > 14 * 3600:
5128                 raise ValueError("too big UTC offset")
5129             offset *= sign
5130             value = value[:-5]
5131         if len(value) == 0:
5132             return offset, decoded
5133         if len(value) != 2:
5134             raise ValueError("invalid UTC offset seconds")
5135         seconds = pureint(value)
5136         if seconds >= 60:
5137             raise ValueError("invalid seconds value")
5138         return offset, decoded + timedelta(seconds=seconds)
5139
5140     def _strptime(self, value):
5141         # datetime.strptime's format: %y%m%d%H%M%SZ
5142         if len(value) != LEN_YYMMDDHHMMSSZ:
5143             raise ValueError("invalid UTCTime length")
5144         if value[-1] != "Z":
5145             raise ValueError("non UTC timezone")
5146         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5147         year += 2000 if year < 50 else 1900
5148         return datetime(year, month, day, hour, minute, second)
5149
5150     def _dt_sanitize(self, value):
5151         if value.year < 1950 or value.year > 2049:
5152             raise ValueError("UTCTime can hold only 1950-2049 years")
5153         return value.replace(microsecond=0)
5154
5155     def _value_sanitize(self, value, ctx=None):
5156         if value.__class__ == binary_type:
5157             try:
5158                 value_decoded = value.decode("ascii")
5159             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5160                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5161             err = None
5162             try:
5163                 return self._strptime(value_decoded), None
5164             except (TypeError, ValueError) as _err:
5165                 err = _err
5166                 if (ctx is not None) and ctx.get("bered", False):
5167                     try:
5168                         offset, _value = self._strptime_bered(value_decoded)
5169                         _value = _value - timedelta(seconds=offset)
5170                         return self._dt_sanitize(_value), value
5171                     except (TypeError, ValueError, OverflowError) as _err:
5172                         err = _err
5173             raise DecodeError(
5174                 "invalid %s format: %r" % (self.asn1_type_name, err),
5175                 klass=self.__class__,
5176             )
5177         if isinstance(value, self.__class__):
5178             return value._value, None
5179         if value.__class__ == datetime:
5180             return self._dt_sanitize(value), None
5181         raise InvalidValueType((self.__class__, datetime))
5182
5183     def _pp_value(self):
5184         if self.ready:
5185             value = self._value.isoformat()
5186             if self.ber_encoded:
5187                 value += " (%s)" % self.ber_raw
5188             return value
5189         return None
5190
5191     def __unicode__(self):
5192         if self.ready:
5193             value = self._value.isoformat()
5194             if self.ber_encoded:
5195                 value += " (%s)" % self.ber_raw
5196             return value
5197         return text_type(self._pp_value())
5198
5199     def __getstate__(self):
5200         return UTCTimeState(
5201             *super(UTCTime, self).__getstate__(),
5202             **{"ber_raw": self.ber_raw}
5203         )
5204
5205     def __setstate__(self, state):
5206         super(UTCTime, self).__setstate__(state)
5207         self.ber_raw = state.ber_raw
5208
5209     def __bytes__(self):
5210         self._assert_ready()
5211         return self._encode_time()
5212
5213     def __eq__(self, their):
5214         if their.__class__ == binary_type:
5215             return self._encode_time() == their
5216         if their.__class__ == datetime:
5217             return self.todatetime() == their
5218         if not isinstance(their, self.__class__):
5219             return False
5220         return (
5221             self._value == their._value and
5222             self.tag == their.tag and
5223             self._expl == their._expl
5224         )
5225
5226     def _encode_time(self):
5227         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5228
5229     def _encode(self):
5230         self._assert_ready()
5231         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5232
5233     def _encode1st(self, state):
5234         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5235
5236     def _encode2nd(self, writer, state_iter):
5237         self._assert_ready()
5238         write_full(writer, self._encode())
5239
5240     def _encode_cer(self, writer):
5241         write_full(writer, self._encode())
5242
5243     def todatetime(self):
5244         return self._value
5245
5246     def __repr__(self):
5247         return pp_console_row(next(self.pps()))
5248
5249     def pps(self, decode_path=()):
5250         yield _pp(
5251             obj=self,
5252             asn1_type_name=self.asn1_type_name,
5253             obj_name=self.__class__.__name__,
5254             decode_path=decode_path,
5255             value=self._pp_value(),
5256             optional=self.optional,
5257             default=self == self.default,
5258             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5259             expl=None if self._expl is None else tag_decode(self._expl),
5260             offset=self.offset,
5261             tlen=self.tlen,
5262             llen=self.llen,
5263             vlen=self.vlen,
5264             expl_offset=self.expl_offset if self.expled else None,
5265             expl_tlen=self.expl_tlen if self.expled else None,
5266             expl_llen=self.expl_llen if self.expled else None,
5267             expl_vlen=self.expl_vlen if self.expled else None,
5268             expl_lenindef=self.expl_lenindef,
5269             ber_encoded=self.ber_encoded,
5270             bered=self.bered,
5271         )
5272         for pp in self.pps_lenindef(decode_path):
5273             yield pp
5274
5275
5276 class GeneralizedTime(UTCTime):
5277     """``GeneralizedTime`` datetime type
5278
5279     This type is similar to :py:class:`pyderasn.UTCTime`.
5280
5281     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5282     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5283     >>> str(t)
5284     '20170930220750.000123Z'
5285     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5286     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5287
5288     .. warning::
5289
5290        Only microsecond fractions are supported in DER encoding.
5291        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5292        higher precision values.
5293
5294     .. warning::
5295
5296        BER encoded data can loss information (accuracy) during decoding
5297        because of float transformations.
5298
5299     .. warning::
5300
5301        Local times (without explicit timezone specification) are treated
5302        as UTC one, no transformations are made.
5303
5304     .. warning::
5305
5306        Zero year is unsupported.
5307     """
5308     __slots__ = ()
5309     tag_default = tag_encode(24)
5310     asn1_type_name = "GeneralizedTime"
5311
5312     def _dt_sanitize(self, value):
5313         return value
5314
5315     def _strptime_bered(self, value):
5316         if len(value) < 4 + 3 * 2:
5317             raise ValueError("invalid GeneralizedTime")
5318         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5319         decoded = datetime(year, month, day, hour)
5320         offset, value = 0, value[10:]
5321         if len(value) == 0:
5322             return offset, decoded
5323         if value[-1] == "Z":
5324             value = value[:-1]
5325         else:
5326             for char, sign in (("-", -1), ("+", 1)):
5327                 idx = value.rfind(char)
5328                 if idx == -1:
5329                     continue
5330                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5331                 v = pureint(offset_raw)
5332                 if len(offset_raw) == 4:
5333                     offset, v = (60 * (v % 100)), v // 100
5334                     if offset >= 3600:
5335                         raise ValueError("invalid UTC offset minutes")
5336                 elif len(offset_raw) == 2:
5337                     pass
5338                 else:
5339                     raise ValueError("invalid UTC offset")
5340                 offset += 3600 * v
5341                 if offset > 14 * 3600:
5342                     raise ValueError("too big UTC offset")
5343                 offset *= sign
5344                 break
5345         if len(value) == 0:
5346             return offset, decoded
5347         if value[0] in DECIMAL_SIGNS:
5348             return offset, (
5349                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5350             )
5351         if len(value) < 2:
5352             raise ValueError("stripped minutes")
5353         decoded += timedelta(seconds=60 * pureint(value[:2]))
5354         value = value[2:]
5355         if len(value) == 0:
5356             return offset, decoded
5357         if value[0] in DECIMAL_SIGNS:
5358             return offset, (
5359                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5360             )
5361         if len(value) < 2:
5362             raise ValueError("stripped seconds")
5363         decoded += timedelta(seconds=pureint(value[:2]))
5364         value = value[2:]
5365         if len(value) == 0:
5366             return offset, decoded
5367         if value[0] not in DECIMAL_SIGNS:
5368             raise ValueError("invalid format after seconds")
5369         return offset, (
5370             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5371         )
5372
5373     def _strptime(self, value):
5374         l = len(value)
5375         if l == LEN_YYYYMMDDHHMMSSZ:
5376             # datetime.strptime's format: %Y%m%d%H%M%SZ
5377             if value[-1] != "Z":
5378                 raise ValueError("non UTC timezone")
5379             return datetime(*str_to_time_fractions(value[:-1]))
5380         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5381             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5382             if value[-1] != "Z":
5383                 raise ValueError("non UTC timezone")
5384             if value[14] != ".":
5385                 raise ValueError("no fractions separator")
5386             us = value[15:-1]
5387             if us[-1] == "0":
5388                 raise ValueError("trailing zero")
5389             us_len = len(us)
5390             if us_len > 6:
5391                 raise ValueError("only microsecond fractions are supported")
5392             us = pureint(us + ("0" * (6 - us_len)))
5393             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5394             return datetime(year, month, day, hour, minute, second, us)
5395         raise ValueError("invalid GeneralizedTime length")
5396
5397     def _encode_time(self):
5398         value = self._value
5399         encoded = value.strftime("%Y%m%d%H%M%S")
5400         if value.microsecond > 0:
5401             encoded += (".%06d" % value.microsecond).rstrip("0")
5402         return (encoded + "Z").encode("ascii")
5403
5404     def _encode(self):
5405         self._assert_ready()
5406         value = self._value
5407         if value.microsecond > 0:
5408             encoded = self._encode_time()
5409             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5410         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5411
5412     def _encode1st(self, state):
5413         self._assert_ready()
5414         vlen = len(self._encode_time())
5415         return len(self.tag) + len_size(vlen) + vlen, state
5416
5417     def _encode2nd(self, writer, state_iter):
5418         write_full(writer, self._encode())
5419
5420
5421 class GraphicString(CommonString):
5422     __slots__ = ()
5423     tag_default = tag_encode(25)
5424     encoding = "iso-8859-1"
5425     asn1_type_name = "GraphicString"
5426
5427
5428 class ISO646String(VisibleString):
5429     __slots__ = ()
5430     asn1_type_name = "ISO646String"
5431
5432
5433 class GeneralString(CommonString):
5434     __slots__ = ()
5435     tag_default = tag_encode(27)
5436     encoding = "iso-8859-1"
5437     asn1_type_name = "GeneralString"
5438
5439
5440 class UniversalString(CommonString):
5441     __slots__ = ()
5442     tag_default = tag_encode(28)
5443     encoding = "utf-32-be"
5444     asn1_type_name = "UniversalString"
5445
5446
5447 class BMPString(CommonString):
5448     __slots__ = ()
5449     tag_default = tag_encode(30)
5450     encoding = "utf-16-be"
5451     asn1_type_name = "BMPString"
5452
5453
5454 ChoiceState = namedtuple(
5455     "ChoiceState",
5456     BasicState._fields + ("specs", "value",),
5457     **NAMEDTUPLE_KWARGS
5458 )
5459
5460
5461 class Choice(Obj):
5462     """``CHOICE`` special type
5463
5464     ::
5465
5466         class GeneralName(Choice):
5467             schema = (
5468                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5469                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5470             )
5471
5472     >>> gn = GeneralName()
5473     GeneralName CHOICE
5474     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5475     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5476     >>> gn["dNSName"] = IA5String("bar.baz")
5477     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5478     >>> gn["rfc822Name"]
5479     None
5480     >>> gn["dNSName"]
5481     [2] IA5String IA5 bar.baz
5482     >>> gn.choice
5483     'dNSName'
5484     >>> gn.value == gn["dNSName"]
5485     True
5486     >>> gn.specs
5487     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5488
5489     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5490     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5491     """
5492     __slots__ = ("specs",)
5493     tag_default = None
5494     asn1_type_name = "CHOICE"
5495
5496     def __init__(
5497             self,
5498             value=None,
5499             schema=None,
5500             impl=None,
5501             expl=None,
5502             default=None,
5503             optional=False,
5504             _decoded=(0, 0, 0),
5505     ):
5506         """
5507         :param value: set the value. Either ``(choice, value)`` tuple, or
5508                       :py:class:`pyderasn.Choice` object
5509         :param bytes impl: can not be set, do **not** use it
5510         :param bytes expl: override default tag with ``EXPLICIT`` one
5511         :param default: set default value. Type same as in ``value``
5512         :param bool optional: is object ``OPTIONAL`` in sequence
5513         """
5514         if impl is not None:
5515             raise ValueError("no implicit tag allowed for CHOICE")
5516         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5517         if schema is None:
5518             schema = getattr(self, "schema", ())
5519         if len(schema) == 0:
5520             raise ValueError("schema must be specified")
5521         self.specs = (
5522             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5523         )
5524         self._value = None
5525         if value is not None:
5526             self._value = self._value_sanitize(value)
5527         if default is not None:
5528             default_value = self._value_sanitize(default)
5529             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5530             default_obj.specs = self.specs
5531             default_obj._value = default_value
5532             self.default = default_obj
5533             if value is None:
5534                 self._value = copy(default_obj._value)
5535         if self._expl is not None:
5536             tag_class, _, tag_num = tag_decode(self._expl)
5537             self._tag_order = (tag_class, tag_num)
5538
5539     def _value_sanitize(self, value):
5540         if (value.__class__ == tuple) and len(value) == 2:
5541             choice, obj = value
5542             spec = self.specs.get(choice)
5543             if spec is None:
5544                 raise ObjUnknown(choice)
5545             if not isinstance(obj, spec.__class__):
5546                 raise InvalidValueType((spec,))
5547             return (choice, spec(obj))
5548         if isinstance(value, self.__class__):
5549             return value._value
5550         raise InvalidValueType((self.__class__, tuple))
5551
5552     @property
5553     def ready(self):
5554         return self._value is not None and self._value[1].ready
5555
5556     @property
5557     def bered(self):
5558         return self.expl_lenindef or (
5559             (self._value is not None) and
5560             self._value[1].bered
5561         )
5562
5563     def __getstate__(self):
5564         return ChoiceState(
5565             __version__,
5566             self.tag,
5567             self._tag_order,
5568             self._expl,
5569             self.default,
5570             self.optional,
5571             self.offset,
5572             self.llen,
5573             self.vlen,
5574             self.expl_lenindef,
5575             self.lenindef,
5576             self.ber_encoded,
5577             self.specs,
5578             copy(self._value),
5579         )
5580
5581     def __setstate__(self, state):
5582         super(Choice, self).__setstate__(state)
5583         self.specs = state.specs
5584         self._value = state.value
5585
5586     def __eq__(self, their):
5587         if (their.__class__ == tuple) and len(their) == 2:
5588             return self._value == their
5589         if not isinstance(their, self.__class__):
5590             return False
5591         return (
5592             self.specs == their.specs and
5593             self._value == their._value
5594         )
5595
5596     def __call__(
5597             self,
5598             value=None,
5599             expl=None,
5600             default=None,
5601             optional=None,
5602     ):
5603         return self.__class__(
5604             value=value,
5605             schema=self.specs,
5606             expl=self._expl if expl is None else expl,
5607             default=self.default if default is None else default,
5608             optional=self.optional if optional is None else optional,
5609         )
5610
5611     @property
5612     def choice(self):
5613         """Name of the choice
5614         """
5615         self._assert_ready()
5616         return self._value[0]
5617
5618     @property
5619     def value(self):
5620         """Value of underlying choice
5621         """
5622         self._assert_ready()
5623         return self._value[1]
5624
5625     @property
5626     def tag_order(self):
5627         self._assert_ready()
5628         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5629
5630     @property
5631     def tag_order_cer(self):
5632         return min(v.tag_order_cer for v in itervalues(self.specs))
5633
5634     def __getitem__(self, key):
5635         if key not in self.specs:
5636             raise ObjUnknown(key)
5637         if self._value is None:
5638             return None
5639         choice, value = self._value
5640         if choice != key:
5641             return None
5642         return value
5643
5644     def __setitem__(self, key, value):
5645         spec = self.specs.get(key)
5646         if spec is None:
5647             raise ObjUnknown(key)
5648         if not isinstance(value, spec.__class__):
5649             raise InvalidValueType((spec.__class__,))
5650         self._value = (key, spec(value))
5651
5652     @property
5653     def tlen(self):
5654         return 0
5655
5656     @property
5657     def decoded(self):
5658         return self._value[1].decoded if self.ready else False
5659
5660     def _encode(self):
5661         self._assert_ready()
5662         return self._value[1].encode()
5663
5664     def _encode1st(self, state):
5665         self._assert_ready()
5666         return self._value[1].encode1st(state)
5667
5668     def _encode2nd(self, writer, state_iter):
5669         self._value[1].encode2nd(writer, state_iter)
5670
5671     def _encode_cer(self, writer):
5672         self._assert_ready()
5673         self._value[1].encode_cer(writer)
5674
5675     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5676         for choice, spec in iteritems(self.specs):
5677             sub_decode_path = decode_path + (choice,)
5678             try:
5679                 spec.decode(
5680                     tlv,
5681                     offset=offset,
5682                     leavemm=True,
5683                     decode_path=sub_decode_path,
5684                     ctx=ctx,
5685                     tag_only=True,
5686                     _ctx_immutable=False,
5687                 )
5688             except TagMismatch:
5689                 continue
5690             break
5691         else:
5692             raise TagMismatch(
5693                 klass=self.__class__,
5694                 decode_path=decode_path,
5695                 offset=offset,
5696             )
5697         if tag_only:  # pragma: no cover
5698             yield None
5699             return
5700         if evgen_mode:
5701             for _decode_path, value, tail in spec.decode_evgen(
5702                     tlv,
5703                     offset=offset,
5704                     leavemm=True,
5705                     decode_path=sub_decode_path,
5706                     ctx=ctx,
5707                     _ctx_immutable=False,
5708             ):
5709                 yield _decode_path, value, tail
5710         else:
5711             _, value, tail = next(spec.decode_evgen(
5712                 tlv,
5713                 offset=offset,
5714                 leavemm=True,
5715                 decode_path=sub_decode_path,
5716                 ctx=ctx,
5717                 _ctx_immutable=False,
5718                 _evgen_mode=False,
5719             ))
5720         obj = self.__class__(
5721             schema=self.specs,
5722             expl=self._expl,
5723             default=self.default,
5724             optional=self.optional,
5725             _decoded=(offset, 0, value.fulllen),
5726         )
5727         obj._value = (choice, value)
5728         yield decode_path, obj, tail
5729
5730     def __repr__(self):
5731         value = pp_console_row(next(self.pps()))
5732         if self.ready:
5733             value = "%s[%r]" % (value, self.value)
5734         return value
5735
5736     def pps(self, decode_path=()):
5737         yield _pp(
5738             obj=self,
5739             asn1_type_name=self.asn1_type_name,
5740             obj_name=self.__class__.__name__,
5741             decode_path=decode_path,
5742             value=self.choice if self.ready else None,
5743             optional=self.optional,
5744             default=self == self.default,
5745             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5746             expl=None if self._expl is None else tag_decode(self._expl),
5747             offset=self.offset,
5748             tlen=self.tlen,
5749             llen=self.llen,
5750             vlen=self.vlen,
5751             expl_lenindef=self.expl_lenindef,
5752             bered=self.bered,
5753         )
5754         if self.ready:
5755             yield self.value.pps(decode_path=decode_path + (self.choice,))
5756         for pp in self.pps_lenindef(decode_path):
5757             yield pp
5758
5759
5760 class PrimitiveTypes(Choice):
5761     """Predefined ``CHOICE`` for all generic primitive types
5762
5763     It could be useful for general decoding of some unspecified values:
5764
5765     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5766     OCTET STRING 3 bytes 666f6f
5767     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5768     INTEGER 1193046
5769     """
5770     __slots__ = ()
5771     schema = tuple((klass.__name__, klass()) for klass in (
5772         Boolean,
5773         Integer,
5774         BitString,
5775         OctetString,
5776         Null,
5777         ObjectIdentifier,
5778         UTF8String,
5779         NumericString,
5780         PrintableString,
5781         TeletexString,
5782         VideotexString,
5783         IA5String,
5784         UTCTime,
5785         GeneralizedTime,
5786         GraphicString,
5787         VisibleString,
5788         ISO646String,
5789         GeneralString,
5790         UniversalString,
5791         BMPString,
5792     ))
5793
5794
5795 AnyState = namedtuple(
5796     "AnyState",
5797     BasicState._fields + ("value", "defined"),
5798     **NAMEDTUPLE_KWARGS
5799 )
5800
5801
5802 class Any(Obj):
5803     """``ANY`` special type
5804
5805     >>> Any(Integer(-123))
5806     ANY INTEGER -123 (0X:7B)
5807     >>> a = Any(OctetString(b"hello world").encode())
5808     ANY 040b68656c6c6f20776f726c64
5809     >>> hexenc(bytes(a))
5810     b'0x040x0bhello world'
5811     """
5812     __slots__ = ("defined",)
5813     tag_default = tag_encode(0)
5814     asn1_type_name = "ANY"
5815
5816     def __init__(
5817             self,
5818             value=None,
5819             expl=None,
5820             optional=False,
5821             _decoded=(0, 0, 0),
5822     ):
5823         """
5824         :param value: set the value. Either any kind of pyderasn's
5825                       **ready** object, or bytes. Pay attention that
5826                       **no** validation is performed if raw binary value
5827                       is valid TLV, except just tag decoding
5828         :param bytes expl: override default tag with ``EXPLICIT`` one
5829         :param bool optional: is object ``OPTIONAL`` in sequence
5830         """
5831         super(Any, self).__init__(None, expl, None, optional, _decoded)
5832         if value is None:
5833             self._value = None
5834         else:
5835             value = self._value_sanitize(value)
5836             self._value = value
5837             if self._expl is None:
5838                 if value.__class__ == binary_type:
5839                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5840                 else:
5841                     tag_class, tag_num = value.tag_order
5842             else:
5843                 tag_class, _, tag_num = tag_decode(self._expl)
5844             self._tag_order = (tag_class, tag_num)
5845         self.defined = None
5846
5847     def _value_sanitize(self, value):
5848         if value.__class__ == binary_type:
5849             if len(value) == 0:
5850                 raise ValueError("Any value can not be empty")
5851             return value
5852         if isinstance(value, self.__class__):
5853             return value._value
5854         if not isinstance(value, Obj):
5855             raise InvalidValueType((self.__class__, Obj, binary_type))
5856         return value
5857
5858     @property
5859     def ready(self):
5860         return self._value is not None
5861
5862     @property
5863     def tag_order(self):
5864         self._assert_ready()
5865         return self._tag_order
5866
5867     @property
5868     def bered(self):
5869         if self.expl_lenindef or self.lenindef:
5870             return True
5871         if self.defined is None:
5872             return False
5873         return self.defined[1].bered
5874
5875     def __getstate__(self):
5876         return AnyState(
5877             __version__,
5878             self.tag,
5879             self._tag_order,
5880             self._expl,
5881             None,
5882             self.optional,
5883             self.offset,
5884             self.llen,
5885             self.vlen,
5886             self.expl_lenindef,
5887             self.lenindef,
5888             self.ber_encoded,
5889             self._value,
5890             self.defined,
5891         )
5892
5893     def __setstate__(self, state):
5894         super(Any, self).__setstate__(state)
5895         self._value = state.value
5896         self.defined = state.defined
5897
5898     def __eq__(self, their):
5899         if their.__class__ == binary_type:
5900             if self._value.__class__ == binary_type:
5901                 return self._value == their
5902             return self._value.encode() == their
5903         if issubclass(their.__class__, Any):
5904             if self.ready and their.ready:
5905                 return bytes(self) == bytes(their)
5906             return self.ready == their.ready
5907         return False
5908
5909     def __call__(
5910             self,
5911             value=None,
5912             expl=None,
5913             optional=None,
5914     ):
5915         return self.__class__(
5916             value=value,
5917             expl=self._expl if expl is None else expl,
5918             optional=self.optional if optional is None else optional,
5919         )
5920
5921     def __bytes__(self):
5922         self._assert_ready()
5923         value = self._value
5924         if value.__class__ == binary_type:
5925             return value
5926         return self._value.encode()
5927
5928     @property
5929     def tlen(self):
5930         return 0
5931
5932     def _encode(self):
5933         self._assert_ready()
5934         value = self._value
5935         if value.__class__ == binary_type:
5936             return value
5937         return value.encode()
5938
5939     def _encode1st(self, state):
5940         self._assert_ready()
5941         value = self._value
5942         if value.__class__ == binary_type:
5943             return len(value), state
5944         return value.encode1st(state)
5945
5946     def _encode2nd(self, writer, state_iter):
5947         value = self._value
5948         if value.__class__ == binary_type:
5949             write_full(writer, value)
5950         else:
5951             value.encode2nd(writer, state_iter)
5952
5953     def _encode_cer(self, writer):
5954         self._assert_ready()
5955         value = self._value
5956         if value.__class__ == binary_type:
5957             write_full(writer, value)
5958         else:
5959             value.encode_cer(writer)
5960
5961     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5962         try:
5963             t, tlen, lv = tag_strip(tlv)
5964         except DecodeError as err:
5965             raise err.__class__(
5966                 msg=err.msg,
5967                 klass=self.__class__,
5968                 decode_path=decode_path,
5969                 offset=offset,
5970             )
5971         try:
5972             l, llen, v = len_decode(lv)
5973         except LenIndefForm as err:
5974             if not ctx.get("bered", False):
5975                 raise err.__class__(
5976                     msg=err.msg,
5977                     klass=self.__class__,
5978                     decode_path=decode_path,
5979                     offset=offset,
5980                 )
5981             llen, vlen, v = 1, 0, lv[1:]
5982             sub_offset = offset + tlen + llen
5983             chunk_i = 0
5984             while v[:EOC_LEN].tobytes() != EOC:
5985                 chunk, v = Any().decode(
5986                     v,
5987                     offset=sub_offset,
5988                     decode_path=decode_path + (str(chunk_i),),
5989                     leavemm=True,
5990                     ctx=ctx,
5991                     _ctx_immutable=False,
5992                 )
5993                 vlen += chunk.tlvlen
5994                 sub_offset += chunk.tlvlen
5995                 chunk_i += 1
5996             tlvlen = tlen + llen + vlen + EOC_LEN
5997             obj = self.__class__(
5998                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5999                 expl=self._expl,
6000                 optional=self.optional,
6001                 _decoded=(offset, 0, tlvlen),
6002             )
6003             obj.lenindef = True
6004             obj.tag = t.tobytes()
6005             yield decode_path, obj, v[EOC_LEN:]
6006             return
6007         except DecodeError as err:
6008             raise err.__class__(
6009                 msg=err.msg,
6010                 klass=self.__class__,
6011                 decode_path=decode_path,
6012                 offset=offset,
6013             )
6014         if l > len(v):
6015             raise NotEnoughData(
6016                 "encoded length is longer than data",
6017                 klass=self.__class__,
6018                 decode_path=decode_path,
6019                 offset=offset,
6020             )
6021         tlvlen = tlen + llen + l
6022         v, tail = tlv[:tlvlen], v[l:]
6023         obj = self.__class__(
6024             value=None if evgen_mode else v.tobytes(),
6025             expl=self._expl,
6026             optional=self.optional,
6027             _decoded=(offset, 0, tlvlen),
6028         )
6029         obj.tag = t.tobytes()
6030         yield decode_path, obj, tail
6031
6032     def __repr__(self):
6033         return pp_console_row(next(self.pps()))
6034
6035     def pps(self, decode_path=()):
6036         value = self._value
6037         if value is None:
6038             pass
6039         elif value.__class__ == binary_type:
6040             value = None
6041         else:
6042             value = repr(value)
6043         yield _pp(
6044             obj=self,
6045             asn1_type_name=self.asn1_type_name,
6046             obj_name=self.__class__.__name__,
6047             decode_path=decode_path,
6048             value=value,
6049             blob=self._value if self._value.__class__ == binary_type else None,
6050             optional=self.optional,
6051             default=self == self.default,
6052             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6053             expl=None if self._expl is None else tag_decode(self._expl),
6054             offset=self.offset,
6055             tlen=self.tlen,
6056             llen=self.llen,
6057             vlen=self.vlen,
6058             expl_offset=self.expl_offset if self.expled else None,
6059             expl_tlen=self.expl_tlen if self.expled else None,
6060             expl_llen=self.expl_llen if self.expled else None,
6061             expl_vlen=self.expl_vlen if self.expled else None,
6062             expl_lenindef=self.expl_lenindef,
6063             lenindef=self.lenindef,
6064             bered=self.bered,
6065         )
6066         defined_by, defined = self.defined or (None, None)
6067         if defined_by is not None:
6068             yield defined.pps(
6069                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6070             )
6071         for pp in self.pps_lenindef(decode_path):
6072             yield pp
6073
6074
6075 ########################################################################
6076 # ASN.1 constructed types
6077 ########################################################################
6078
6079 def abs_decode_path(decode_path, rel_path):
6080     """Create an absolute decode path from current and relative ones
6081
6082     :param decode_path: current decode path, starting point. Tuple of strings
6083     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6084                      If first tuple's element is "/", then treat it as
6085                      an absolute path, ignoring ``decode_path`` as
6086                      starting point. Also this tuple can contain ".."
6087                      elements, stripping the leading element from
6088                      ``decode_path``
6089
6090     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6091     ("foo", "bar", "baz", "whatever")
6092     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6093     ("foo", "whatever")
6094     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6095     ("baz", "whatever")
6096     """
6097     if rel_path[0] == "/":
6098         return rel_path[1:]
6099     if rel_path[0] == "..":
6100         return abs_decode_path(decode_path[:-1], rel_path[1:])
6101     return decode_path + rel_path
6102
6103
6104 SequenceState = namedtuple(
6105     "SequenceState",
6106     BasicState._fields + ("specs", "value",),
6107     **NAMEDTUPLE_KWARGS
6108 )
6109
6110
6111 class SequenceEncode1stMixing(object):
6112     def _encode1st(self, state):
6113         state.append(0)
6114         idx = len(state) - 1
6115         vlen = 0
6116         for v in self._values_for_encoding():
6117             l, _ = v.encode1st(state)
6118             vlen += l
6119         state[idx] = vlen
6120         return len(self.tag) + len_size(vlen) + vlen, state
6121
6122
6123 class Sequence(SequenceEncode1stMixing, Obj):
6124     """``SEQUENCE`` structure type
6125
6126     You have to make specification of sequence::
6127
6128         class Extension(Sequence):
6129             schema = (
6130                 ("extnID", ObjectIdentifier()),
6131                 ("critical", Boolean(default=False)),
6132                 ("extnValue", OctetString()),
6133             )
6134
6135     Then, you can work with it as with dictionary.
6136
6137     >>> ext = Extension()
6138     >>> Extension().specs
6139     OrderedDict([
6140         ('extnID', OBJECT IDENTIFIER),
6141         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6142         ('extnValue', OCTET STRING),
6143     ])
6144     >>> ext["extnID"] = "1.2.3"
6145     Traceback (most recent call last):
6146     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6147     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6148
6149     You can determine if sequence is ready to be encoded:
6150
6151     >>> ext.ready
6152     False
6153     >>> ext.encode()
6154     Traceback (most recent call last):
6155     pyderasn.ObjNotReady: object is not ready: extnValue
6156     >>> ext["extnValue"] = OctetString(b"foobar")
6157     >>> ext.ready
6158     True
6159
6160     Value you want to assign, must have the same **type** as in
6161     corresponding specification, but it can have different tags,
6162     optional/default attributes -- they will be taken from specification
6163     automatically::
6164
6165         class TBSCertificate(Sequence):
6166             schema = (
6167                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6168             [...]
6169
6170     >>> tbs = TBSCertificate()
6171     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6172
6173     Assign ``None`` to remove value from sequence.
6174
6175     You can set values in Sequence during its initialization:
6176
6177     >>> AlgorithmIdentifier((
6178         ("algorithm", ObjectIdentifier("1.2.3")),
6179         ("parameters", Any(Null()))
6180     ))
6181     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6182
6183     You can determine if value exists/set in the sequence and take its value:
6184
6185     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6186     (True, True, False)
6187     >>> ext["extnID"]
6188     OBJECT IDENTIFIER 1.2.3
6189
6190     But pay attention that if value has default, then it won't be (not
6191     in) in the sequence (because ``DEFAULT`` must not be encoded in
6192     DER), but you can read its value:
6193
6194     >>> "critical" in ext, ext["critical"]
6195     (False, BOOLEAN False)
6196     >>> ext["critical"] = Boolean(True)
6197     >>> "critical" in ext, ext["critical"]
6198     (True, BOOLEAN True)
6199
6200     All defaulted values are always optional.
6201
6202     .. _allow_default_values_ctx:
6203
6204     DER prohibits default value encoding and will raise an error if
6205     default value is unexpectedly met during decode.
6206     If :ref:`bered <bered_ctx>` context option is set, then no error
6207     will be raised, but ``bered`` attribute set. You can disable strict
6208     defaulted values existence validation by setting
6209     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6210
6211     All values with DEFAULT specified are decoded atomically in
6212     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6213     SEQUENCE, then it will be yielded as a single element, not
6214     disassembled. That is required for DEFAULT existence check.
6215
6216     Two sequences are equal if they have equal specification (schema),
6217     implicit/explicit tagging and the same values.
6218     """
6219     __slots__ = ("specs",)
6220     tag_default = tag_encode(form=TagFormConstructed, num=16)
6221     asn1_type_name = "SEQUENCE"
6222
6223     def __init__(
6224             self,
6225             value=None,
6226             schema=None,
6227             impl=None,
6228             expl=None,
6229             default=None,
6230             optional=False,
6231             _decoded=(0, 0, 0),
6232     ):
6233         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6234         if schema is None:
6235             schema = getattr(self, "schema", ())
6236         self.specs = (
6237             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6238         )
6239         self._value = {}
6240         if value is not None:
6241             if issubclass(value.__class__, Sequence):
6242                 self._value = value._value
6243             elif hasattr(value, "__iter__"):
6244                 for seq_key, seq_value in value:
6245                     self[seq_key] = seq_value
6246             else:
6247                 raise InvalidValueType((Sequence,))
6248         if default is not None:
6249             if not issubclass(default.__class__, Sequence):
6250                 raise InvalidValueType((Sequence,))
6251             default_value = default._value
6252             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6253             default_obj.specs = self.specs
6254             default_obj._value = default_value
6255             self.default = default_obj
6256             if value is None:
6257                 self._value = copy(default_obj._value)
6258
6259     @property
6260     def ready(self):
6261         for name, spec in iteritems(self.specs):
6262             value = self._value.get(name)
6263             if value is None:
6264                 if spec.optional:
6265                     continue
6266                 return False
6267             if not value.ready:
6268                 return False
6269         return True
6270
6271     @property
6272     def bered(self):
6273         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6274             return True
6275         return any(value.bered for value in itervalues(self._value))
6276
6277     def __getstate__(self):
6278         return SequenceState(
6279             __version__,
6280             self.tag,
6281             self._tag_order,
6282             self._expl,
6283             self.default,
6284             self.optional,
6285             self.offset,
6286             self.llen,
6287             self.vlen,
6288             self.expl_lenindef,
6289             self.lenindef,
6290             self.ber_encoded,
6291             self.specs,
6292             {k: copy(v) for k, v in iteritems(self._value)},
6293         )
6294
6295     def __setstate__(self, state):
6296         super(Sequence, self).__setstate__(state)
6297         self.specs = state.specs
6298         self._value = state.value
6299
6300     def __eq__(self, their):
6301         if not isinstance(their, self.__class__):
6302             return False
6303         return (
6304             self.specs == their.specs and
6305             self.tag == their.tag and
6306             self._expl == their._expl and
6307             self._value == their._value
6308         )
6309
6310     def __call__(
6311             self,
6312             value=None,
6313             impl=None,
6314             expl=None,
6315             default=None,
6316             optional=None,
6317     ):
6318         return self.__class__(
6319             value=value,
6320             schema=self.specs,
6321             impl=self.tag if impl is None else impl,
6322             expl=self._expl if expl is None else expl,
6323             default=self.default if default is None else default,
6324             optional=self.optional if optional is None else optional,
6325         )
6326
6327     def __contains__(self, key):
6328         return key in self._value
6329
6330     def __setitem__(self, key, value):
6331         spec = self.specs.get(key)
6332         if spec is None:
6333             raise ObjUnknown(key)
6334         if value is None:
6335             self._value.pop(key, None)
6336             return
6337         if not isinstance(value, spec.__class__):
6338             raise InvalidValueType((spec.__class__,))
6339         value = spec(value=value)
6340         if spec.default is not None and value == spec.default:
6341             self._value.pop(key, None)
6342             return
6343         self._value[key] = value
6344
6345     def __getitem__(self, key):
6346         value = self._value.get(key)
6347         if value is not None:
6348             return value
6349         spec = self.specs.get(key)
6350         if spec is None:
6351             raise ObjUnknown(key)
6352         if spec.default is not None:
6353             return spec.default
6354         return None
6355
6356     def _values_for_encoding(self):
6357         for name, spec in iteritems(self.specs):
6358             value = self._value.get(name)
6359             if value is None:
6360                 if spec.optional:
6361                     continue
6362                 raise ObjNotReady(name)
6363             yield value
6364
6365     def _encode(self):
6366         v = b"".join(v.encode() for v in self._values_for_encoding())
6367         return b"".join((self.tag, len_encode(len(v)), v))
6368
6369     def _encode2nd(self, writer, state_iter):
6370         write_full(writer, self.tag + len_encode(next(state_iter)))
6371         for v in self._values_for_encoding():
6372             v.encode2nd(writer, state_iter)
6373
6374     def _encode_cer(self, writer):
6375         write_full(writer, self.tag + LENINDEF)
6376         for v in self._values_for_encoding():
6377             v.encode_cer(writer)
6378         write_full(writer, EOC)
6379
6380     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6381         try:
6382             t, tlen, lv = tag_strip(tlv)
6383         except DecodeError as err:
6384             raise err.__class__(
6385                 msg=err.msg,
6386                 klass=self.__class__,
6387                 decode_path=decode_path,
6388                 offset=offset,
6389             )
6390         if t != self.tag:
6391             raise TagMismatch(
6392                 klass=self.__class__,
6393                 decode_path=decode_path,
6394                 offset=offset,
6395             )
6396         if tag_only:  # pragma: no cover
6397             yield None
6398             return
6399         lenindef = False
6400         ctx_bered = ctx.get("bered", False)
6401         try:
6402             l, llen, v = len_decode(lv)
6403         except LenIndefForm as err:
6404             if not ctx_bered:
6405                 raise err.__class__(
6406                     msg=err.msg,
6407                     klass=self.__class__,
6408                     decode_path=decode_path,
6409                     offset=offset,
6410                 )
6411             l, llen, v = 0, 1, lv[1:]
6412             lenindef = True
6413         except DecodeError as err:
6414             raise err.__class__(
6415                 msg=err.msg,
6416                 klass=self.__class__,
6417                 decode_path=decode_path,
6418                 offset=offset,
6419             )
6420         if l > len(v):
6421             raise NotEnoughData(
6422                 "encoded length is longer than data",
6423                 klass=self.__class__,
6424                 decode_path=decode_path,
6425                 offset=offset,
6426             )
6427         if not lenindef:
6428             v, tail = v[:l], v[l:]
6429         vlen = 0
6430         sub_offset = offset + tlen + llen
6431         values = {}
6432         ber_encoded = False
6433         ctx_allow_default_values = ctx.get("allow_default_values", False)
6434         for name, spec in iteritems(self.specs):
6435             if spec.optional and (
6436                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6437                     len(v) == 0
6438             ):
6439                 continue
6440             spec_defaulted = spec.default is not None
6441             sub_decode_path = decode_path + (name,)
6442             try:
6443                 if evgen_mode and not spec_defaulted:
6444                     for _decode_path, value, v_tail in spec.decode_evgen(
6445                             v,
6446                             sub_offset,
6447                             leavemm=True,
6448                             decode_path=sub_decode_path,
6449                             ctx=ctx,
6450                             _ctx_immutable=False,
6451                     ):
6452                         yield _decode_path, value, v_tail
6453                 else:
6454                     _, value, v_tail = next(spec.decode_evgen(
6455                         v,
6456                         sub_offset,
6457                         leavemm=True,
6458                         decode_path=sub_decode_path,
6459                         ctx=ctx,
6460                         _ctx_immutable=False,
6461                         _evgen_mode=False,
6462                     ))
6463             except TagMismatch as err:
6464                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6465                     continue
6466                 raise
6467
6468             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6469             if not evgen_mode and defined is not None:
6470                 defined_by, defined_spec = defined
6471                 if issubclass(value.__class__, SequenceOf):
6472                     for i, _value in enumerate(value):
6473                         sub_sub_decode_path = sub_decode_path + (
6474                             str(i),
6475                             DecodePathDefBy(defined_by),
6476                         )
6477                         defined_value, defined_tail = defined_spec.decode(
6478                             memoryview(bytes(_value)),
6479                             sub_offset + (
6480                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6481                                 if value.expled else (value.tlen + value.llen)
6482                             ),
6483                             leavemm=True,
6484                             decode_path=sub_sub_decode_path,
6485                             ctx=ctx,
6486                             _ctx_immutable=False,
6487                         )
6488                         if len(defined_tail) > 0:
6489                             raise DecodeError(
6490                                 "remaining data",
6491                                 klass=self.__class__,
6492                                 decode_path=sub_sub_decode_path,
6493                                 offset=offset,
6494                             )
6495                         _value.defined = (defined_by, defined_value)
6496                 else:
6497                     defined_value, defined_tail = defined_spec.decode(
6498                         memoryview(bytes(value)),
6499                         sub_offset + (
6500                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6501                             if value.expled else (value.tlen + value.llen)
6502                         ),
6503                         leavemm=True,
6504                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6505                         ctx=ctx,
6506                         _ctx_immutable=False,
6507                     )
6508                     if len(defined_tail) > 0:
6509                         raise DecodeError(
6510                             "remaining data",
6511                             klass=self.__class__,
6512                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6513                             offset=offset,
6514                         )
6515                     value.defined = (defined_by, defined_value)
6516
6517             value_len = value.fulllen
6518             vlen += value_len
6519             sub_offset += value_len
6520             v = v_tail
6521             if spec_defaulted:
6522                 if evgen_mode:
6523                     yield sub_decode_path, value, v_tail
6524                 if value == spec.default:
6525                     if ctx_bered or ctx_allow_default_values:
6526                         ber_encoded = True
6527                     else:
6528                         raise DecodeError(
6529                             "DEFAULT value met",
6530                             klass=self.__class__,
6531                             decode_path=sub_decode_path,
6532                             offset=sub_offset,
6533                         )
6534             if not evgen_mode:
6535                 values[name] = value
6536                 spec_defines = getattr(spec, "defines", ())
6537                 if len(spec_defines) == 0:
6538                     defines_by_path = ctx.get("defines_by_path", ())
6539                     if len(defines_by_path) > 0:
6540                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6541                 if spec_defines is not None and len(spec_defines) > 0:
6542                     for rel_path, schema in spec_defines:
6543                         defined = schema.get(value, None)
6544                         if defined is not None:
6545                             ctx.setdefault("_defines", []).append((
6546                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6547                                 (value, defined),
6548                             ))
6549         if lenindef:
6550             if v[:EOC_LEN].tobytes() != EOC:
6551                 raise DecodeError(
6552                     "no EOC",
6553                     klass=self.__class__,
6554                     decode_path=decode_path,
6555                     offset=offset,
6556                 )
6557             tail = v[EOC_LEN:]
6558             vlen += EOC_LEN
6559         elif len(v) > 0:
6560             raise DecodeError(
6561                 "remaining data",
6562                 klass=self.__class__,
6563                 decode_path=decode_path,
6564                 offset=offset,
6565             )
6566         obj = self.__class__(
6567             schema=self.specs,
6568             impl=self.tag,
6569             expl=self._expl,
6570             default=self.default,
6571             optional=self.optional,
6572             _decoded=(offset, llen, vlen),
6573         )
6574         obj._value = values
6575         obj.lenindef = lenindef
6576         obj.ber_encoded = ber_encoded
6577         yield decode_path, obj, tail
6578
6579     def __repr__(self):
6580         value = pp_console_row(next(self.pps()))
6581         cols = []
6582         for name in self.specs:
6583             _value = self._value.get(name)
6584             if _value is None:
6585                 continue
6586             cols.append("%s: %s" % (name, repr(_value)))
6587         return "%s[%s]" % (value, "; ".join(cols))
6588
6589     def pps(self, decode_path=()):
6590         yield _pp(
6591             obj=self,
6592             asn1_type_name=self.asn1_type_name,
6593             obj_name=self.__class__.__name__,
6594             decode_path=decode_path,
6595             optional=self.optional,
6596             default=self == self.default,
6597             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6598             expl=None if self._expl is None else tag_decode(self._expl),
6599             offset=self.offset,
6600             tlen=self.tlen,
6601             llen=self.llen,
6602             vlen=self.vlen,
6603             expl_offset=self.expl_offset if self.expled else None,
6604             expl_tlen=self.expl_tlen if self.expled else None,
6605             expl_llen=self.expl_llen if self.expled else None,
6606             expl_vlen=self.expl_vlen if self.expled else None,
6607             expl_lenindef=self.expl_lenindef,
6608             lenindef=self.lenindef,
6609             ber_encoded=self.ber_encoded,
6610             bered=self.bered,
6611         )
6612         for name in self.specs:
6613             value = self._value.get(name)
6614             if value is None:
6615                 continue
6616             yield value.pps(decode_path=decode_path + (name,))
6617         for pp in self.pps_lenindef(decode_path):
6618             yield pp
6619
6620
6621 class Set(Sequence, SequenceEncode1stMixing):
6622     """``SET`` structure type
6623
6624     Its usage is identical to :py:class:`pyderasn.Sequence`.
6625
6626     .. _allow_unordered_set_ctx:
6627
6628     DER prohibits unordered values encoding and will raise an error
6629     during decode. If :ref:`bered <bered_ctx>` context option is set,
6630     then no error will occur. Also you can disable strict values
6631     ordering check by setting ``"allow_unordered_set": True``
6632     :ref:`context <ctx>` option.
6633     """
6634     __slots__ = ()
6635     tag_default = tag_encode(form=TagFormConstructed, num=17)
6636     asn1_type_name = "SET"
6637
6638     def _values_for_encoding(self):
6639         return sorted(
6640             super(Set, self)._values_for_encoding(),
6641             key=attrgetter("tag_order"),
6642         )
6643
6644     def _encode_cer(self, writer):
6645         write_full(writer, self.tag + LENINDEF)
6646         for v in sorted(
6647                 super(Set, self)._values_for_encoding(),
6648                 key=attrgetter("tag_order_cer"),
6649         ):
6650             v.encode_cer(writer)
6651         write_full(writer, EOC)
6652
6653     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6654         try:
6655             t, tlen, lv = tag_strip(tlv)
6656         except DecodeError as err:
6657             raise err.__class__(
6658                 msg=err.msg,
6659                 klass=self.__class__,
6660                 decode_path=decode_path,
6661                 offset=offset,
6662             )
6663         if t != self.tag:
6664             raise TagMismatch(
6665                 klass=self.__class__,
6666                 decode_path=decode_path,
6667                 offset=offset,
6668             )
6669         if tag_only:
6670             yield None
6671             return
6672         lenindef = False
6673         ctx_bered = ctx.get("bered", False)
6674         try:
6675             l, llen, v = len_decode(lv)
6676         except LenIndefForm as err:
6677             if not ctx_bered:
6678                 raise err.__class__(
6679                     msg=err.msg,
6680                     klass=self.__class__,
6681                     decode_path=decode_path,
6682                     offset=offset,
6683                 )
6684             l, llen, v = 0, 1, lv[1:]
6685             lenindef = True
6686         except DecodeError as err:
6687             raise err.__class__(
6688                 msg=err.msg,
6689                 klass=self.__class__,
6690                 decode_path=decode_path,
6691                 offset=offset,
6692             )
6693         if l > len(v):
6694             raise NotEnoughData(
6695                 "encoded length is longer than data",
6696                 klass=self.__class__,
6697                 offset=offset,
6698             )
6699         if not lenindef:
6700             v, tail = v[:l], v[l:]
6701         vlen = 0
6702         sub_offset = offset + tlen + llen
6703         values = {}
6704         ber_encoded = False
6705         ctx_allow_default_values = ctx.get("allow_default_values", False)
6706         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6707         tag_order_prev = (0, 0)
6708         _specs_items = copy(self.specs)
6709
6710         while len(v) > 0:
6711             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6712                 break
6713             for name, spec in iteritems(_specs_items):
6714                 sub_decode_path = decode_path + (name,)
6715                 try:
6716                     spec.decode(
6717                         v,
6718                         sub_offset,
6719                         leavemm=True,
6720                         decode_path=sub_decode_path,
6721                         ctx=ctx,
6722                         tag_only=True,
6723                         _ctx_immutable=False,
6724                     )
6725                 except TagMismatch:
6726                     continue
6727                 break
6728             else:
6729                 raise TagMismatch(
6730                     klass=self.__class__,
6731                     decode_path=decode_path,
6732                     offset=offset,
6733                 )
6734             spec_defaulted = spec.default is not None
6735             if evgen_mode and not spec_defaulted:
6736                 for _decode_path, value, v_tail in spec.decode_evgen(
6737                         v,
6738                         sub_offset,
6739                         leavemm=True,
6740                         decode_path=sub_decode_path,
6741                         ctx=ctx,
6742                         _ctx_immutable=False,
6743                 ):
6744                     yield _decode_path, value, v_tail
6745             else:
6746                 _, value, v_tail = next(spec.decode_evgen(
6747                     v,
6748                     sub_offset,
6749                     leavemm=True,
6750                     decode_path=sub_decode_path,
6751                     ctx=ctx,
6752                     _ctx_immutable=False,
6753                     _evgen_mode=False,
6754                 ))
6755             value_tag_order = value.tag_order
6756             value_len = value.fulllen
6757             if tag_order_prev >= value_tag_order:
6758                 if ctx_bered or ctx_allow_unordered_set:
6759                     ber_encoded = True
6760                 else:
6761                     raise DecodeError(
6762                         "unordered " + self.asn1_type_name,
6763                         klass=self.__class__,
6764                         decode_path=sub_decode_path,
6765                         offset=sub_offset,
6766                     )
6767             if spec_defaulted:
6768                 if evgen_mode:
6769                     yield sub_decode_path, value, v_tail
6770                 if value != spec.default:
6771                     pass
6772                 elif ctx_bered or ctx_allow_default_values:
6773                     ber_encoded = True
6774                 else:
6775                     raise DecodeError(
6776                         "DEFAULT value met",
6777                         klass=self.__class__,
6778                         decode_path=sub_decode_path,
6779                         offset=sub_offset,
6780                     )
6781             values[name] = value
6782             del _specs_items[name]
6783             tag_order_prev = value_tag_order
6784             sub_offset += value_len
6785             vlen += value_len
6786             v = v_tail
6787
6788         obj = self.__class__(
6789             schema=self.specs,
6790             impl=self.tag,
6791             expl=self._expl,
6792             default=self.default,
6793             optional=self.optional,
6794             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6795         )
6796         if lenindef:
6797             if v[:EOC_LEN].tobytes() != EOC:
6798                 raise DecodeError(
6799                     "no EOC",
6800                     klass=self.__class__,
6801                     decode_path=decode_path,
6802                     offset=offset,
6803                 )
6804             tail = v[EOC_LEN:]
6805             obj.lenindef = True
6806         for name, spec in iteritems(self.specs):
6807             if name not in values and not spec.optional:
6808                 raise DecodeError(
6809                     "%s value is not ready" % name,
6810                     klass=self.__class__,
6811                     decode_path=decode_path,
6812                     offset=offset,
6813                 )
6814         if not evgen_mode:
6815             obj._value = values
6816         obj.ber_encoded = ber_encoded
6817         yield decode_path, obj, tail
6818
6819
6820 SequenceOfState = namedtuple(
6821     "SequenceOfState",
6822     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6823     **NAMEDTUPLE_KWARGS
6824 )
6825
6826
6827 class SequenceOf(SequenceEncode1stMixing, Obj):
6828     """``SEQUENCE OF`` sequence type
6829
6830     For that kind of type you must specify the object it will carry on
6831     (bounds are for example here, not required)::
6832
6833         class Ints(SequenceOf):
6834             schema = Integer()
6835             bounds = (0, 2)
6836
6837     >>> ints = Ints()
6838     >>> ints.append(Integer(123))
6839     >>> ints.append(Integer(234))
6840     >>> ints
6841     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6842     >>> [int(i) for i in ints]
6843     [123, 234]
6844     >>> ints.append(Integer(345))
6845     Traceback (most recent call last):
6846     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6847     >>> ints[1]
6848     INTEGER 234
6849     >>> ints[1] = Integer(345)
6850     >>> ints
6851     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6852
6853     You can initialize sequence with preinitialized values:
6854
6855     >>> ints = Ints([Integer(123), Integer(234)])
6856
6857     Also you can use iterator as a value:
6858
6859     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6860
6861     And it won't be iterated until encoding process. Pay attention that
6862     bounds and required schema checks are done only during the encoding
6863     process in that case! After encode was called, then value is zeroed
6864     back to empty list and you have to set it again. That mode is useful
6865     mainly with CER encoding mode, where all objects from the iterable
6866     will be streamed to the buffer, without copying all of them to
6867     memory first.
6868     """
6869     __slots__ = ("spec", "_bound_min", "_bound_max")
6870     tag_default = tag_encode(form=TagFormConstructed, num=16)
6871     asn1_type_name = "SEQUENCE OF"
6872
6873     def __init__(
6874             self,
6875             value=None,
6876             schema=None,
6877             bounds=None,
6878             impl=None,
6879             expl=None,
6880             default=None,
6881             optional=False,
6882             _decoded=(0, 0, 0),
6883     ):
6884         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6885         if schema is None:
6886             schema = getattr(self, "schema", None)
6887         if schema is None:
6888             raise ValueError("schema must be specified")
6889         self.spec = schema
6890         self._bound_min, self._bound_max = getattr(
6891             self,
6892             "bounds",
6893             (0, float("+inf")),
6894         ) if bounds is None else bounds
6895         self._value = []
6896         if value is not None:
6897             self._value = self._value_sanitize(value)
6898         if default is not None:
6899             default_value = self._value_sanitize(default)
6900             default_obj = self.__class__(
6901                 schema=schema,
6902                 impl=self.tag,
6903                 expl=self._expl,
6904             )
6905             default_obj._value = default_value
6906             self.default = default_obj
6907             if value is None:
6908                 self._value = copy(default_obj._value)
6909
6910     def _value_sanitize(self, value):
6911         iterator = False
6912         if issubclass(value.__class__, SequenceOf):
6913             value = value._value
6914         elif hasattr(value, NEXT_ATTR_NAME):
6915             iterator = True
6916         elif hasattr(value, "__iter__"):
6917             value = list(value)
6918         else:
6919             raise InvalidValueType((self.__class__, iter, "iterator"))
6920         if not iterator:
6921             if not self._bound_min <= len(value) <= self._bound_max:
6922                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6923             class_expected = self.spec.__class__
6924             for v in value:
6925                 if not isinstance(v, class_expected):
6926                     raise InvalidValueType((class_expected,))
6927         return value
6928
6929     @property
6930     def ready(self):
6931         if hasattr(self._value, NEXT_ATTR_NAME):
6932             return True
6933         if self._bound_min > 0 and len(self._value) == 0:
6934             return False
6935         return all(v.ready for v in self._value)
6936
6937     @property
6938     def bered(self):
6939         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6940             return True
6941         return any(v.bered for v in self._value)
6942
6943     def __getstate__(self):
6944         if hasattr(self._value, NEXT_ATTR_NAME):
6945             raise ValueError("can not pickle SequenceOf with iterator")
6946         return SequenceOfState(
6947             __version__,
6948             self.tag,
6949             self._tag_order,
6950             self._expl,
6951             self.default,
6952             self.optional,
6953             self.offset,
6954             self.llen,
6955             self.vlen,
6956             self.expl_lenindef,
6957             self.lenindef,
6958             self.ber_encoded,
6959             self.spec,
6960             [copy(v) for v in self._value],
6961             self._bound_min,
6962             self._bound_max,
6963         )
6964
6965     def __setstate__(self, state):
6966         super(SequenceOf, self).__setstate__(state)
6967         self.spec = state.spec
6968         self._value = state.value
6969         self._bound_min = state.bound_min
6970         self._bound_max = state.bound_max
6971
6972     def __eq__(self, their):
6973         if isinstance(their, self.__class__):
6974             return (
6975                 self.spec == their.spec and
6976                 self.tag == their.tag and
6977                 self._expl == their._expl and
6978                 self._value == their._value
6979             )
6980         if hasattr(their, "__iter__"):
6981             return self._value == list(their)
6982         return False
6983
6984     def __call__(
6985             self,
6986             value=None,
6987             bounds=None,
6988             impl=None,
6989             expl=None,
6990             default=None,
6991             optional=None,
6992     ):
6993         return self.__class__(
6994             value=value,
6995             schema=self.spec,
6996             bounds=(
6997                 (self._bound_min, self._bound_max)
6998                 if bounds is None else bounds
6999             ),
7000             impl=self.tag if impl is None else impl,
7001             expl=self._expl if expl is None else expl,
7002             default=self.default if default is None else default,
7003             optional=self.optional if optional is None else optional,
7004         )
7005
7006     def __contains__(self, key):
7007         return key in self._value
7008
7009     def append(self, value):
7010         if not isinstance(value, self.spec.__class__):
7011             raise InvalidValueType((self.spec.__class__,))
7012         if len(self._value) + 1 > self._bound_max:
7013             raise BoundsError(
7014                 self._bound_min,
7015                 len(self._value) + 1,
7016                 self._bound_max,
7017             )
7018         self._value.append(value)
7019
7020     def __iter__(self):
7021         return iter(self._value)
7022
7023     def __len__(self):
7024         return len(self._value)
7025
7026     def __setitem__(self, key, value):
7027         if not isinstance(value, self.spec.__class__):
7028             raise InvalidValueType((self.spec.__class__,))
7029         self._value[key] = self.spec(value=value)
7030
7031     def __getitem__(self, key):
7032         return self._value[key]
7033
7034     def _values_for_encoding(self):
7035         return iter(self._value)
7036
7037     def _encode(self):
7038         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7039         if iterator:
7040             values = []
7041             values_append = values.append
7042             class_expected = self.spec.__class__
7043             values_for_encoding = self._values_for_encoding()
7044             self._value = []
7045             for v in values_for_encoding:
7046                 if not isinstance(v, class_expected):
7047                     raise InvalidValueType((class_expected,))
7048                 values_append(v.encode())
7049             if not self._bound_min <= len(values) <= self._bound_max:
7050                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7051             value = b"".join(values)
7052         else:
7053             value = b"".join(v.encode() for v in self._values_for_encoding())
7054         return b"".join((self.tag, len_encode(len(value)), value))
7055
7056     def _encode1st(self, state):
7057         state = super(SequenceOf, self)._encode1st(state)
7058         if hasattr(self._value, NEXT_ATTR_NAME):
7059             self._value = []
7060         return state
7061
7062     def _encode2nd(self, writer, state_iter):
7063         write_full(writer, self.tag + len_encode(next(state_iter)))
7064         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7065         if iterator:
7066             values_count = 0
7067             class_expected = self.spec.__class__
7068             values_for_encoding = self._values_for_encoding()
7069             self._value = []
7070             for v in values_for_encoding:
7071                 if not isinstance(v, class_expected):
7072                     raise InvalidValueType((class_expected,))
7073                 v.encode2nd(writer, state_iter)
7074                 values_count += 1
7075             if not self._bound_min <= values_count <= self._bound_max:
7076                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7077         else:
7078             for v in self._values_for_encoding():
7079                 v.encode2nd(writer, state_iter)
7080
7081     def _encode_cer(self, writer):
7082         write_full(writer, self.tag + LENINDEF)
7083         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7084         if iterator:
7085             class_expected = self.spec.__class__
7086             values_count = 0
7087             values_for_encoding = self._values_for_encoding()
7088             self._value = []
7089             for v in values_for_encoding:
7090                 if not isinstance(v, class_expected):
7091                     raise InvalidValueType((class_expected,))
7092                 v.encode_cer(writer)
7093                 values_count += 1
7094             if not self._bound_min <= values_count <= self._bound_max:
7095                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7096         else:
7097             for v in self._values_for_encoding():
7098                 v.encode_cer(writer)
7099         write_full(writer, EOC)
7100
7101     def _decode(
7102             self,
7103             tlv,
7104             offset,
7105             decode_path,
7106             ctx,
7107             tag_only,
7108             evgen_mode,
7109             ordering_check=False,
7110     ):
7111         try:
7112             t, tlen, lv = tag_strip(tlv)
7113         except DecodeError as err:
7114             raise err.__class__(
7115                 msg=err.msg,
7116                 klass=self.__class__,
7117                 decode_path=decode_path,
7118                 offset=offset,
7119             )
7120         if t != self.tag:
7121             raise TagMismatch(
7122                 klass=self.__class__,
7123                 decode_path=decode_path,
7124                 offset=offset,
7125             )
7126         if tag_only:
7127             yield None
7128             return
7129         lenindef = False
7130         ctx_bered = ctx.get("bered", False)
7131         try:
7132             l, llen, v = len_decode(lv)
7133         except LenIndefForm as err:
7134             if not ctx_bered:
7135                 raise err.__class__(
7136                     msg=err.msg,
7137                     klass=self.__class__,
7138                     decode_path=decode_path,
7139                     offset=offset,
7140                 )
7141             l, llen, v = 0, 1, lv[1:]
7142             lenindef = True
7143         except DecodeError as err:
7144             raise err.__class__(
7145                 msg=err.msg,
7146                 klass=self.__class__,
7147                 decode_path=decode_path,
7148                 offset=offset,
7149             )
7150         if l > len(v):
7151             raise NotEnoughData(
7152                 "encoded length is longer than data",
7153                 klass=self.__class__,
7154                 decode_path=decode_path,
7155                 offset=offset,
7156             )
7157         if not lenindef:
7158             v, tail = v[:l], v[l:]
7159         vlen = 0
7160         sub_offset = offset + tlen + llen
7161         _value = []
7162         _value_count = 0
7163         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7164         value_prev = memoryview(v[:0])
7165         ber_encoded = False
7166         spec = self.spec
7167         while len(v) > 0:
7168             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7169                 break
7170             sub_decode_path = decode_path + (str(_value_count),)
7171             if evgen_mode:
7172                 for _decode_path, value, v_tail in spec.decode_evgen(
7173                         v,
7174                         sub_offset,
7175                         leavemm=True,
7176                         decode_path=sub_decode_path,
7177                         ctx=ctx,
7178                         _ctx_immutable=False,
7179                 ):
7180                     yield _decode_path, value, v_tail
7181             else:
7182                 _, value, v_tail = next(spec.decode_evgen(
7183                     v,
7184                     sub_offset,
7185                     leavemm=True,
7186                     decode_path=sub_decode_path,
7187                     ctx=ctx,
7188                     _ctx_immutable=False,
7189                     _evgen_mode=False,
7190                 ))
7191             value_len = value.fulllen
7192             if ordering_check:
7193                 if value_prev.tobytes() > v[:value_len].tobytes():
7194                     if ctx_bered or ctx_allow_unordered_set:
7195                         ber_encoded = True
7196                     else:
7197                         raise DecodeError(
7198                             "unordered " + self.asn1_type_name,
7199                             klass=self.__class__,
7200                             decode_path=sub_decode_path,
7201                             offset=sub_offset,
7202                         )
7203                 value_prev = v[:value_len]
7204             _value_count += 1
7205             if not evgen_mode:
7206                 _value.append(value)
7207             sub_offset += value_len
7208             vlen += value_len
7209             v = v_tail
7210         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7211             raise DecodeError(
7212                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7213                 klass=self.__class__,
7214                 decode_path=decode_path,
7215                 offset=offset,
7216             )
7217         try:
7218             obj = self.__class__(
7219                 value=None if evgen_mode else _value,
7220                 schema=spec,
7221                 bounds=(self._bound_min, self._bound_max),
7222                 impl=self.tag,
7223                 expl=self._expl,
7224                 default=self.default,
7225                 optional=self.optional,
7226                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7227             )
7228         except BoundsError as err:
7229             raise DecodeError(
7230                 msg=str(err),
7231                 klass=self.__class__,
7232                 decode_path=decode_path,
7233                 offset=offset,
7234             )
7235         if lenindef:
7236             if v[:EOC_LEN].tobytes() != EOC:
7237                 raise DecodeError(
7238                     "no EOC",
7239                     klass=self.__class__,
7240                     decode_path=decode_path,
7241                     offset=offset,
7242                 )
7243             obj.lenindef = True
7244             tail = v[EOC_LEN:]
7245         obj.ber_encoded = ber_encoded
7246         yield decode_path, obj, tail
7247
7248     def __repr__(self):
7249         return "%s[%s]" % (
7250             pp_console_row(next(self.pps())),
7251             ", ".join(repr(v) for v in self._value),
7252         )
7253
7254     def pps(self, decode_path=()):
7255         yield _pp(
7256             obj=self,
7257             asn1_type_name=self.asn1_type_name,
7258             obj_name=self.__class__.__name__,
7259             decode_path=decode_path,
7260             optional=self.optional,
7261             default=self == self.default,
7262             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7263             expl=None if self._expl is None else tag_decode(self._expl),
7264             offset=self.offset,
7265             tlen=self.tlen,
7266             llen=self.llen,
7267             vlen=self.vlen,
7268             expl_offset=self.expl_offset if self.expled else None,
7269             expl_tlen=self.expl_tlen if self.expled else None,
7270             expl_llen=self.expl_llen if self.expled else None,
7271             expl_vlen=self.expl_vlen if self.expled else None,
7272             expl_lenindef=self.expl_lenindef,
7273             lenindef=self.lenindef,
7274             ber_encoded=self.ber_encoded,
7275             bered=self.bered,
7276         )
7277         for i, value in enumerate(self._value):
7278             yield value.pps(decode_path=decode_path + (str(i),))
7279         for pp in self.pps_lenindef(decode_path):
7280             yield pp
7281
7282
7283 class SetOf(SequenceOf):
7284     """``SET OF`` sequence type
7285
7286     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7287     """
7288     __slots__ = ()
7289     tag_default = tag_encode(form=TagFormConstructed, num=17)
7290     asn1_type_name = "SET OF"
7291
7292     def _value_sanitize(self, value):
7293         value = super(SetOf, self)._value_sanitize(value)
7294         if hasattr(value, NEXT_ATTR_NAME):
7295             raise ValueError(
7296                 "SetOf does not support iterator values, as no sense in them"
7297             )
7298         return value
7299
7300     def _encode(self):
7301         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7302         return b"".join((self.tag, len_encode(len(v)), v))
7303
7304     def _encode2nd(self, writer, state_iter):
7305         write_full(writer, self.tag + len_encode(next(state_iter)))
7306         values = []
7307         for v in self._values_for_encoding():
7308             buf = BytesIO()
7309             v.encode2nd(buf.write, state_iter)
7310             values.append(buf.getvalue())
7311         values.sort()
7312         for v in values:
7313             write_full(writer, v)
7314
7315     def _encode_cer(self, writer):
7316         write_full(writer, self.tag + LENINDEF)
7317         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7318             write_full(writer, v)
7319         write_full(writer, EOC)
7320
7321     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7322         return super(SetOf, self)._decode(
7323             tlv,
7324             offset,
7325             decode_path,
7326             ctx,
7327             tag_only,
7328             evgen_mode,
7329             ordering_check=True,
7330         )
7331
7332
7333 def obj_by_path(pypath):  # pragma: no cover
7334     """Import object specified as string Python path
7335
7336     Modules must be separated from classes/functions with ``:``.
7337
7338     >>> obj_by_path("foo.bar:Baz")
7339     <class 'foo.bar.Baz'>
7340     >>> obj_by_path("foo.bar:Baz.boo")
7341     <classmethod 'foo.bar.Baz.boo'>
7342     """
7343     mod, objs = pypath.rsplit(":", 1)
7344     from importlib import import_module
7345     obj = import_module(mod)
7346     for obj_name in objs.split("."):
7347         obj = getattr(obj, obj_name)
7348     return obj
7349
7350
7351 def generic_decoder():  # pragma: no cover
7352     # All of this below is a big hack with self references
7353     choice = PrimitiveTypes()
7354     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7355     choice.specs["SetOf"] = SetOf(schema=choice)
7356     for i in six_xrange(31):
7357         choice.specs["SequenceOf%d" % i] = SequenceOf(
7358             schema=choice,
7359             expl=tag_ctxc(i),
7360         )
7361     choice.specs["Any"] = Any()
7362
7363     # Class name equals to type name, to omit it from output
7364     class SEQUENCEOF(SequenceOf):
7365         __slots__ = ()
7366         schema = choice
7367
7368     def pprint_any(
7369             obj,
7370             oid_maps=(),
7371             with_colours=False,
7372             with_decode_path=False,
7373             decode_path_only=(),
7374             decode_path=(),
7375     ):
7376         def _pprint_pps(pps):
7377             for pp in pps:
7378                 if hasattr(pp, "_fields"):
7379                     if (
7380                             decode_path_only != () and
7381                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7382                     ):
7383                         continue
7384                     if pp.asn1_type_name == Choice.asn1_type_name:
7385                         continue
7386                     pp_kwargs = pp._asdict()
7387                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7388                     pp = _pp(**pp_kwargs)
7389                     yield pp_console_row(
7390                         pp,
7391                         oid_maps=oid_maps,
7392                         with_offsets=True,
7393                         with_blob=False,
7394                         with_colours=with_colours,
7395                         with_decode_path=with_decode_path,
7396                         decode_path_len_decrease=len(decode_path_only),
7397                     )
7398                     for row in pp_console_blob(
7399                             pp,
7400                             decode_path_len_decrease=len(decode_path_only),
7401                     ):
7402                         yield row
7403                 else:
7404                     for row in _pprint_pps(pp):
7405                         yield row
7406         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7407     return SEQUENCEOF(), pprint_any
7408
7409
7410 def ascii_visualize(ba):
7411     """Output only ASCII printable characters, like in hexdump -C
7412
7413     Example output for given binary string (right part)::
7414
7415         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7416                                                            ^^^^^^^^^^^^^^^^
7417     """
7418     return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7419
7420
7421 def hexdump(raw):
7422     """Generate ``hexdump -C`` like output
7423
7424     Rendered example::
7425
7426         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7427         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7428
7429     Result of that function is a generator of lines, where each line is
7430     a list of columns::
7431
7432         [
7433             [...],
7434             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7435                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7436                           " |i..?.....G...k.j|"]
7437             [...],
7438         ]
7439     """
7440     hexed = hexenc(raw).upper()
7441     addr, cols = 0, ["%08x " % 0]
7442     for i in six_xrange(0, len(hexed), 2):
7443         if i != 0 and i // 2 % 8 == 0:
7444             cols[-1] += " "
7445         if i != 0 and i // 2 % 16 == 0:
7446             cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7447             yield cols
7448             addr += 16
7449             cols = ["%08x " % addr]
7450         cols.append(" " + hexed[i:i + 2])
7451     if len(cols) > 0:
7452         cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7453         yield cols
7454
7455
7456 def browse(raw, obj, oid_maps=()):
7457     """Interactive browser
7458
7459     :param bytes raw: binary data you decoded
7460     :param obj: decoded :py:class:`pyderasn.Obj`
7461     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7462                      Its human readable form is printed when OID is met
7463
7464     .. note:: `urwid <http://urwid.org/>`__ dependency required
7465
7466     This browser is an interactive terminal application for browsing
7467     structures of your decoded ASN.1 objects. You can quit it with **q**
7468     key. It consists of three windows:
7469
7470     :tree:
7471      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7472      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7473      **Left** key goes to constructed element above. **Plus**/**Minus**
7474      keys collapse/uncollapse constructed elements. **Space** toggles it
7475     :info:
7476      window with various information about element. You can scroll it
7477      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7478     :hexdump:
7479      window with raw data hexdump and highlighted current element's
7480      contents. It automatically focuses on element's data. You can
7481      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7482      speed) keys. If element has explicit tag, then it also will be
7483      highlighted with different colour
7484
7485     Window's header contains current decode path and progress bars with
7486     position in *info* and *hexdump* windows.
7487
7488     If you press **d**, then current element will be saved in the
7489     current directory under its decode path name (adding ".0", ".1", etc
7490     suffix if such file already exists). **D** will save it with explicit tag.
7491
7492     You can also invoke it with ``--browse`` command line argument.
7493     """
7494     from copy import deepcopy
7495     from os.path import exists as path_exists
7496     import urwid
7497
7498     class TW(urwid.TreeWidget):
7499         def __init__(self, state, *args, **kwargs):
7500             self.state = state
7501             self.scrolled = {"info": False, "hexdump": False}
7502             super(TW, self).__init__(*args, **kwargs)
7503
7504         def _get_pp(self):
7505             pp = self.get_node().get_value()
7506             constructed = len(pp) > 1
7507             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7508
7509         def _state_update(self):
7510             pp, _ = self._get_pp()
7511             self.state["decode_path"].set_text(
7512                 ":".join(str(p) for p in pp.decode_path)
7513             )
7514             lines = deepcopy(self.state["hexed"])
7515
7516             def attr_set(i, attr):
7517                 line = lines[i // 16]
7518                 idx = 1 + (i - 16 * (i // 16))
7519                 line[idx] = (attr, line[idx])
7520
7521             if pp.expl_offset is not None:
7522                 for i in six_xrange(
7523                         pp.expl_offset,
7524                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7525                 ):
7526                     attr_set(i, "select-expl")
7527             for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7528                 attr_set(i, "select-value")
7529             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7530             self.state["hexdump"].set_focus(pp.offset // 16)
7531             self.state["hexdump"].set_focus_valign("middle")
7532             self.state["hexdump_bar"].set_completion(
7533                 (100 * pp.offset // 16) //
7534                 len(self.state["hexdump"]._body.positions())
7535             )
7536
7537             lines = [
7538                 [("header", "Name: "), pp.obj_name],
7539                 [("header", "Type: "), pp.asn1_type_name],
7540                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7541                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7542                     pp.tlen, pp.llen, pp.vlen,
7543                 )],
7544                 [("header", "TLVlen: "), "%d" % sum((
7545                     pp.tlen, pp.llen, pp.vlen,
7546                 ))],
7547                 [("header", "Slice: "), "[%d:%d]" % (
7548                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7549                 )],
7550             ]
7551             if pp.lenindef:
7552                 lines.append([("warning", "LENINDEF")])
7553             if pp.ber_encoded:
7554                 lines.append([("warning", "BER encoded")])
7555             if pp.bered:
7556                 lines.append([("warning", "BERed")])
7557             if pp.expl is not None:
7558                 lines.append([("header", "EXPLICIT")])
7559                 klass, _, num = pp.expl
7560                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7561                 if pp.expl_offset is not None:
7562                     lines.append(["  Offset: %d" % pp.expl_offset])
7563                     lines.append(["  [TLV]len: %d/%d/%d" % (
7564                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7565                     )])
7566                     lines.append(["  TLVlen: %d" % sum((
7567                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7568                     ))])
7569                     lines.append(["  Slice: [%d:%d]" % (
7570                         pp.expl_offset,
7571                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7572                     )])
7573             if pp.impl is not None:
7574                 klass, _, num = pp.impl
7575                 lines.append([
7576                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7577                 ])
7578             if pp.optional:
7579                 lines.append(["OPTIONAL"])
7580             if pp.default:
7581                 lines.append(["DEFAULT"])
7582             if len(pp.decode_path) > 0:
7583                 ent = pp.decode_path[-1]
7584                 if isinstance(ent, DecodePathDefBy):
7585                     lines.append([""])
7586                     value = str(ent.defined_by)
7587                     oid_name = find_oid_name(
7588                         ent.defined_by.asn1_type_name, oid_maps, value,
7589                     )
7590                     lines.append([("header", "DEFINED BY: "), "%s" % (
7591                         value if oid_name is None
7592                         else "%s (%s)" % (oid_name, value)
7593                     )])
7594             lines.append([""])
7595             if pp.value is not None:
7596                 lines.append([("header", "Value: "), pp.value])
7597                 if (
7598                         len(oid_maps) > 0 and
7599                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7600                 ):
7601                     for oid_map in oid_maps:
7602                         oid_name = oid_map.get(pp.value)
7603                         if oid_name is not None:
7604                             lines.append([("header", "Human: "), oid_name])
7605                             break
7606                 if pp.asn1_type_name == Integer.asn1_type_name:
7607                     lines.append([
7608                         ("header", "Decimal: "), "%d" % int(pp.obj),
7609                     ])
7610                     lines.append([
7611                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7612                     ])
7613             if pp.blob.__class__ == binary_type:
7614                 blob = hexenc(pp.blob).upper()
7615                 for i in six_xrange(0, len(blob), 32):
7616                     lines.append([colonize_hex(blob[i:i + 32])])
7617             elif pp.blob.__class__ == tuple:
7618                 lines.append([", ".join(pp.blob)])
7619             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7620             self.state["info_bar"].set_completion(0)
7621
7622         def selectable(self):
7623             if self.state["widget_current"] != self:
7624                 self.state["widget_current"] = self
7625                 self.scrolled["info"] = False
7626                 self.scrolled["hexdump"] = False
7627                 self._state_update()
7628             return super(TW, self).selectable()
7629
7630         def get_display_text(self):
7631             pp, constructed = self._get_pp()
7632             style = "constructed" if constructed else ""
7633             if len(pp.decode_path) == 0:
7634                 return (style, pp.obj_name)
7635             if pp.asn1_type_name == "EOC":
7636                 return ("eoc", "EOC")
7637             ent = pp.decode_path[-1]
7638             if isinstance(ent, DecodePathDefBy):
7639                 value = str(ent.defined_by)
7640                 oid_name = find_oid_name(
7641                     ent.defined_by.asn1_type_name, oid_maps, value,
7642                 )
7643                 return ("defby", "DEFBY:" + (
7644                     value if oid_name is None else oid_name
7645                 ))
7646             return (style, ent)
7647
7648         def _scroll(self, what, step):
7649             self.state[what]._invalidate()
7650             pos = self.state[what].focus_position
7651             if not self.scrolled[what]:
7652                 self.scrolled[what] = True
7653                 pos -= 2
7654             pos = max(0, pos + step)
7655             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7656             self.state[what].set_focus(pos)
7657             self.state[what].set_focus_valign("top")
7658             self.state[what + "_bar"].set_completion(
7659                 (100 * pos) // len(self.state[what]._body.positions())
7660             )
7661
7662         def keypress(self, size, key):
7663             if key == "q":
7664                 raise urwid.ExitMainLoop()
7665
7666             if key == " ":
7667                 self.expanded = not self.expanded
7668                 self.update_expanded_icon()
7669                 return None
7670
7671             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7672             if key in hexdump_steps:
7673                 self._scroll("hexdump", hexdump_steps[key])
7674                 return None
7675
7676             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7677             if key in info_steps:
7678                 self._scroll("info", info_steps[key])
7679                 return None
7680
7681             if key in ("d", "D"):
7682                 pp, _ = self._get_pp()
7683                 dp = ":".join(str(p) for p in pp.decode_path)
7684                 dp = dp.replace(" ", "_")
7685                 if dp == "":
7686                     dp = "root"
7687                 if key == "d" or pp.expl_offset is None:
7688                     data = self.state["raw"][pp.offset:(
7689                         pp.offset + pp.tlen + pp.llen + pp.vlen
7690                     )]
7691                 else:
7692                     data = self.state["raw"][pp.expl_offset:(
7693                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7694                     )]
7695                 ctr = 0
7696
7697                 def duplicate_path(dp, ctr):
7698                     if ctr == 0:
7699                         return dp
7700                     return "%s.%d" % (dp, ctr)
7701
7702                 while True:
7703                     if not path_exists(duplicate_path(dp, ctr)):
7704                         break
7705                     ctr += 1
7706                 dp = duplicate_path(dp, ctr)
7707                 with open(dp, "wb") as fd:
7708                     fd.write(data)
7709                 self.state["decode_path"].set_text(
7710                     ("warning", "Saved to: " + dp)
7711                 )
7712                 return None
7713             return super(TW, self).keypress(size, key)
7714
7715     class PN(urwid.ParentNode):
7716         def __init__(self, state, value, *args, **kwargs):
7717             self.state = state
7718             if not hasattr(value, "_fields"):
7719                 value = list(value)
7720             super(PN, self).__init__(value, *args, **kwargs)
7721
7722         def load_widget(self):
7723             return TW(self.state, self)
7724
7725         def load_child_keys(self):
7726             value = self.get_value()
7727             if hasattr(value, "_fields"):
7728                 return []
7729             return range(len(value[1:]))
7730
7731         def load_child_node(self, key):
7732             return PN(
7733                 self.state,
7734                 self.get_value()[key + 1],
7735                 parent=self,
7736                 key=key,
7737                 depth=self.get_depth() + 1,
7738             )
7739
7740     class LabeledPG(urwid.ProgressBar):
7741         def __init__(self, label, *args, **kwargs):
7742             self.label = label
7743             super(LabeledPG, self).__init__(*args, **kwargs)
7744
7745         def get_text(self):
7746             return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7747
7748     WinHexdump = urwid.ListBox([urwid.Text("")])
7749     WinInfo = urwid.ListBox([urwid.Text("")])
7750     WinDecodePath = urwid.Text("", "center")
7751     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7752     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7753     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7754         {
7755             "raw": raw,
7756             "hexed": list(hexdump(raw)),
7757             "widget_current": None,
7758             "info": WinInfo,
7759             "info_bar": WinInfoBar,
7760             "hexdump": WinHexdump,
7761             "hexdump_bar": WinHexdumpBar,
7762             "decode_path": WinDecodePath,
7763         },
7764         list(obj.pps()),
7765     )))
7766     help_text = " ".join((
7767         "q:quit",
7768         "space:(un)collapse",
7769         "(pg)up/down/home/end:nav",
7770         "jkJK:hexdump hlHL:info",
7771         "dD:dump",
7772     ))
7773     urwid.MainLoop(
7774         urwid.Frame(
7775             urwid.Columns([
7776                 ("weight", 1, WinTree),
7777                 ("weight", 2, urwid.Pile([
7778                     urwid.LineBox(WinInfo),
7779                     urwid.LineBox(WinHexdump),
7780                 ])),
7781             ]),
7782             header=urwid.Columns([
7783                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7784                 ("weight", 1, WinInfoBar),
7785                 ("weight", 1, WinHexdumpBar),
7786             ]),
7787             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7788         ),
7789         [
7790             ("header", "bold", ""),
7791             ("constructed", "bold", ""),
7792             ("help", "light magenta", ""),
7793             ("warning", "light red", ""),
7794             ("defby", "light red", ""),
7795             ("eoc", "dark red", ""),
7796             ("select-value", "light green", ""),
7797             ("select-expl", "light red", ""),
7798             ("pg-normal", "", "light blue"),
7799             ("pg-complete", "black", "yellow"),
7800         ],
7801     ).run()
7802
7803
7804 def main():  # pragma: no cover
7805     import argparse
7806     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7807     parser.add_argument(
7808         "--skip",
7809         type=int,
7810         default=0,
7811         help="Skip that number of bytes from the beginning",
7812     )
7813     parser.add_argument(
7814         "--oids",
7815         help="Python paths to dictionary with OIDs, comma separated",
7816     )
7817     parser.add_argument(
7818         "--schema",
7819         help="Python path to schema definition to use",
7820     )
7821     parser.add_argument(
7822         "--defines-by-path",
7823         help="Python path to decoder's defines_by_path",
7824     )
7825     parser.add_argument(
7826         "--nobered",
7827         action="store_true",
7828         help="Disallow BER encoding",
7829     )
7830     parser.add_argument(
7831         "--print-decode-path",
7832         action="store_true",
7833         help="Print decode paths",
7834     )
7835     parser.add_argument(
7836         "--decode-path-only",
7837         help="Print only specified decode path",
7838     )
7839     parser.add_argument(
7840         "--allow-expl-oob",
7841         action="store_true",
7842         help="Allow explicit tag out-of-bound",
7843     )
7844     parser.add_argument(
7845         "--evgen",
7846         action="store_true",
7847         help="Turn on event generation mode",
7848     )
7849     parser.add_argument(
7850         "--browse",
7851         action="store_true",
7852         help="Start ASN.1 browser",
7853     )
7854     parser.add_argument(
7855         "RAWFile",
7856         type=argparse.FileType("rb"),
7857         help="Path to BER/CER/DER file you want to decode",
7858     )
7859     args = parser.parse_args()
7860     if PY2:
7861         args.RAWFile.seek(args.skip)
7862         raw = memoryview(args.RAWFile.read())
7863         args.RAWFile.close()
7864     else:
7865         raw = file_mmaped(args.RAWFile)[args.skip:]
7866     oid_maps = (
7867         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7868         if args.oids else ()
7869     )
7870     from functools import partial
7871     if args.schema:
7872         schema = obj_by_path(args.schema)
7873         pprinter = partial(pprint, big_blobs=True)
7874     else:
7875         schema, pprinter = generic_decoder()
7876     ctx = {
7877         "bered": not args.nobered,
7878         "allow_expl_oob": args.allow_expl_oob,
7879     }
7880     if args.defines_by_path is not None:
7881         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7882     if args.browse:
7883         obj, _ = schema().decode(raw, ctx=ctx)
7884         browse(raw, obj, oid_maps)
7885         from sys import exit as sys_exit
7886         sys_exit(0)
7887     from os import environ
7888     pprinter = partial(
7889         pprinter,
7890         oid_maps=oid_maps,
7891         with_colours=environ.get("NO_COLOR") is None,
7892         with_decode_path=args.print_decode_path,
7893         decode_path_only=(
7894             () if args.decode_path_only is None else
7895             tuple(args.decode_path_only.split(":"))
7896         ),
7897     )
7898     if args.evgen:
7899         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7900             print(pprinter(obj, decode_path=decode_path))
7901     else:
7902         obj, tail = schema().decode(raw, ctx=ctx)
7903         print(pprinter(obj))
7904     if tail != b"":
7905         print("\nTrailing data: %s" % hexenc(tail))
7906
7907
7908 if __name__ == "__main__":
7909     from pyderasn import *
7910     main()