]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Substitute class name in exception
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 .. _browser:
905
906 ASN.1 browser
907 -------------
908 .. autofunction:: pyderasn.browse
909
910 Base Obj
911 --------
912 .. autoclass:: pyderasn.Obj
913    :members:
914
915 Primitive types
916 ---------------
917
918 Boolean
919 _______
920 .. autoclass:: pyderasn.Boolean
921    :members: __init__
922
923 Integer
924 _______
925 .. autoclass:: pyderasn.Integer
926    :members: __init__, named, tohex
927
928 BitString
929 _________
930 .. autoclass:: pyderasn.BitString
931    :members: __init__, bit_len, named
932
933 OctetString
934 ___________
935 .. autoclass:: pyderasn.OctetString
936    :members: __init__
937
938 Null
939 ____
940 .. autoclass:: pyderasn.Null
941    :members: __init__
942
943 ObjectIdentifier
944 ________________
945 .. autoclass:: pyderasn.ObjectIdentifier
946    :members: __init__
947
948 Enumerated
949 __________
950 .. autoclass:: pyderasn.Enumerated
951
952 CommonString
953 ____________
954 .. autoclass:: pyderasn.CommonString
955
956 NumericString
957 _____________
958 .. autoclass:: pyderasn.NumericString
959
960 PrintableString
961 _______________
962 .. autoclass:: pyderasn.PrintableString
963    :members: __init__, allow_asterisk, allow_ampersand
964
965 IA5String
966 _________
967 .. autoclass:: pyderasn.IA5String
968
969 VisibleString
970 _____________
971 .. autoclass:: pyderasn.VisibleString
972
973 UTCTime
974 _______
975 .. autoclass:: pyderasn.UTCTime
976    :members: __init__, todatetime
977
978 GeneralizedTime
979 _______________
980 .. autoclass:: pyderasn.GeneralizedTime
981    :members: __init__, todatetime
982
983 Special types
984 -------------
985
986 Choice
987 ______
988 .. autoclass:: pyderasn.Choice
989    :members: __init__, choice, value
990
991 PrimitiveTypes
992 ______________
993 .. autoclass:: PrimitiveTypes
994
995 Any
996 ___
997 .. autoclass:: pyderasn.Any
998    :members: __init__
999
1000 Constructed types
1001 -----------------
1002
1003 Sequence
1004 ________
1005 .. autoclass:: pyderasn.Sequence
1006    :members: __init__
1007
1008 Set
1009 ___
1010 .. autoclass:: pyderasn.Set
1011    :members: __init__
1012
1013 SequenceOf
1014 __________
1015 .. autoclass:: pyderasn.SequenceOf
1016    :members: __init__
1017
1018 SetOf
1019 _____
1020 .. autoclass:: pyderasn.SetOf
1021    :members: __init__
1022
1023 Various
1024 -------
1025
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1041    :members: __init__
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1052
1053 .. _cmdline:
1054
1055 Command-line usage
1056 ------------------
1057
1058 You can decode DER/BER files using command line abilities::
1059
1060     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1061
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1065
1066     $ python -m pyderasn path/to/file
1067         0   [1,3,1604]  . >: SEQUENCE OF
1068         4   [1,3,1453]  . . >: SEQUENCE OF
1069         8   [0,0,   5]  . . . . >: [0] ANY
1070                         . . . . . A0:03:02:01:02
1071        13   [1,1,   3]  . . . . >: INTEGER 61595
1072        18   [1,1,  13]  . . . . >: SEQUENCE OF
1073        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074        31   [1,1,   0]  . . . . . . >: NULL
1075        33   [1,3, 274]  . . . . >: SEQUENCE OF
1076        37   [1,1,  11]  . . . . . . >: SET OF
1077        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1078        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1080     [...]
1081      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1082      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1084                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086                         . . . . . . . . . 61:2E:63:6F:6D:2F
1087      1461   [1,1,  13]  . . >: SEQUENCE OF
1088      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089      1474   [1,1,   0]  . . . . >: NULL
1090      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1091                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1093     [...]
1094
1095 Human readable OIDs
1096 ___________________
1097
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1100
1101     stroid2name = {
1102         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1104         [...]
1105         "2.5.4.10": "id-at-organizationName",
1106         "2.5.4.11": "id-at-organizationalUnitName",
1107     }
1108
1109 then you can pass it to pretty printer to see human readable OIDs::
1110
1111     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1112     [...]
1113        37   [1,1,  11]  . . . . . . >: SET OF
1114        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1115        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1117        50   [1,1,  18]  . . . . . . >: SET OF
1118        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1119        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121        70   [1,1,  18]  . . . . . . >: SET OF
1122        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1123        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1125     [...]
1126
1127 Decode paths
1128 ____________
1129
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1134
1135     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136        0    [1,3,1604]  Certificate SEQUENCE []
1137        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1143                          . . . . 05:00
1144       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150                          . . . . . . . 13:02:45:53
1151       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1152     [...]
1153
1154 Now you can print only the specified tree, for example signature algorithm::
1155
1156     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1158       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1160                          . . 05:00
1161 """
1162
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from mmap import mmap
1174 from mmap import PROT_READ
1175 from operator import attrgetter
1176 from string import ascii_letters
1177 from string import digits
1178 from sys import maxsize as sys_maxsize
1179 from sys import version_info
1180 from unicodedata import category as unicat
1181
1182 from six import add_metaclass
1183 from six import binary_type
1184 from six import byte2int
1185 from six import indexbytes
1186 from six import int2byte
1187 from six import integer_types
1188 from six import iterbytes
1189 from six import iteritems
1190 from six import itervalues
1191 from six import PY2
1192 from six import string_types
1193 from six import text_type
1194 from six import unichr as six_unichr
1195 from six.moves import xrange as six_xrange
1196
1197
1198 try:
1199     from termcolor import colored
1200 except ImportError:  # pragma: no cover
1201     def colored(what, *args, **kwargs):
1202         return what
1203
1204 __version__ = "7.6"
1205
1206 __all__ = (
1207     "agg_octet_string",
1208     "Any",
1209     "BitString",
1210     "BMPString",
1211     "Boolean",
1212     "BoundsError",
1213     "Choice",
1214     "colonize_hex",
1215     "DecodeError",
1216     "DecodePathDefBy",
1217     "encode2pass",
1218     "encode_cer",
1219     "Enumerated",
1220     "ExceedingData",
1221     "file_mmaped",
1222     "GeneralizedTime",
1223     "GeneralString",
1224     "GraphicString",
1225     "hexdec",
1226     "hexenc",
1227     "IA5String",
1228     "Integer",
1229     "InvalidLength",
1230     "InvalidOID",
1231     "InvalidValueType",
1232     "ISO646String",
1233     "LenIndefForm",
1234     "NotEnoughData",
1235     "Null",
1236     "NumericString",
1237     "obj_by_path",
1238     "ObjectIdentifier",
1239     "ObjNotReady",
1240     "ObjUnknown",
1241     "OctetString",
1242     "PrimitiveTypes",
1243     "PrintableString",
1244     "Sequence",
1245     "SequenceOf",
1246     "Set",
1247     "SetOf",
1248     "T61String",
1249     "tag_ctxc",
1250     "tag_ctxp",
1251     "tag_decode",
1252     "TagClassApplication",
1253     "TagClassContext",
1254     "TagClassPrivate",
1255     "TagClassUniversal",
1256     "TagFormConstructed",
1257     "TagFormPrimitive",
1258     "TagMismatch",
1259     "TeletexString",
1260     "UniversalString",
1261     "UTCTime",
1262     "UTF8String",
1263     "VideotexString",
1264     "VisibleString",
1265 )
1266
1267 TagClassUniversal = 0
1268 TagClassApplication = 1 << 6
1269 TagClassContext = 1 << 7
1270 TagClassPrivate = 1 << 6 | 1 << 7
1271 TagFormPrimitive = 0
1272 TagFormConstructed = 1 << 5
1273 TagClassReprs = {
1274     TagClassContext: "",
1275     TagClassApplication: "APPLICATION ",
1276     TagClassPrivate: "PRIVATE ",
1277     TagClassUniversal: "UNIV ",
1278 }
1279 EOC = b"\x00\x00"
1280 EOC_LEN = len(EOC)
1281 LENINDEF = b"\x80"  # length indefinite mark
1282 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1283 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1284 SET01 = frozenset("01")
1285 DECIMALS = frozenset(digits)
1286 DECIMAL_SIGNS = ".,"
1287 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1288
1289
1290 def file_mmaped(fd):
1291     """Make mmap-ed memoryview for reading from file
1292
1293     :param fd: file object
1294     :returns: memoryview over read-only mmap-ing of the whole file
1295     """
1296     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1297
1298
1299 def pureint(value):
1300     if not set(value) <= DECIMALS:
1301         raise ValueError("non-pure integer")
1302     return int(value)
1303
1304
1305 def fractions2float(fractions_raw):
1306     pureint(fractions_raw)
1307     return float("0." + fractions_raw)
1308
1309
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311     """Get define by decode path
1312     """
1313     for path, define in defines_by_path:
1314         if len(path) != len(sub_decode_path):
1315             continue
1316         for p1, p2 in zip(path, sub_decode_path):
1317             if (p1 is not any) and (p1 != p2):
1318                 break
1319         else:
1320             return define
1321
1322
1323 ########################################################################
1324 # Errors
1325 ########################################################################
1326
1327 class ASN1Error(ValueError):
1328     pass
1329
1330
1331 class DecodeError(ASN1Error):
1332     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1333         """
1334         :param str msg: reason of decode failing
1335         :param klass: optional exact DecodeError inherited class (like
1336                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337                       :py:exc:`InvalidLength`)
1338         :param decode_path: tuple of strings. It contains human
1339                             readable names of the fields through which
1340                             decoding process has passed
1341         :param int offset: binary offset where failure happened
1342         """
1343         super(DecodeError, self).__init__()
1344         self.msg = msg
1345         self.klass = klass
1346         self.decode_path = decode_path
1347         self.offset = offset
1348
1349     def __str__(self):
1350         return " ".join(
1351             c for c in (
1352                 "" if self.klass is None else self.klass.__name__,
1353                 (
1354                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355                     if len(self.decode_path) > 0 else ""
1356                 ),
1357                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1358                 self.msg,
1359             ) if c != ""
1360         )
1361
1362     def __repr__(self):
1363         return "%s(%s)" % (self.__class__.__name__, self)
1364
1365
1366 class NotEnoughData(DecodeError):
1367     pass
1368
1369
1370 class ExceedingData(ASN1Error):
1371     def __init__(self, nbytes):
1372         super(ExceedingData, self).__init__()
1373         self.nbytes = nbytes
1374
1375     def __str__(self):
1376         return "%d trailing bytes" % self.nbytes
1377
1378     def __repr__(self):
1379         return "%s(%s)" % (self.__class__.__name__, self)
1380
1381
1382 class LenIndefForm(DecodeError):
1383     pass
1384
1385
1386 class TagMismatch(DecodeError):
1387     pass
1388
1389
1390 class InvalidLength(DecodeError):
1391     pass
1392
1393
1394 class InvalidOID(DecodeError):
1395     pass
1396
1397
1398 class ObjUnknown(ASN1Error):
1399     def __init__(self, name):
1400         super(ObjUnknown, self).__init__()
1401         self.name = name
1402
1403     def __str__(self):
1404         return "object is unknown: %s" % self.name
1405
1406     def __repr__(self):
1407         return "%s(%s)" % (self.__class__.__name__, self)
1408
1409
1410 class ObjNotReady(ASN1Error):
1411     def __init__(self, name):
1412         super(ObjNotReady, self).__init__()
1413         self.name = name
1414
1415     def __str__(self):
1416         return "object is not ready: %s" % self.name
1417
1418     def __repr__(self):
1419         return "%s(%s)" % (self.__class__.__name__, self)
1420
1421
1422 class InvalidValueType(ASN1Error):
1423     def __init__(self, expected_types):
1424         super(InvalidValueType, self).__init__()
1425         self.expected_types = expected_types
1426
1427     def __str__(self):
1428         return "invalid value type, expected: %s" % ", ".join(
1429             [repr(t) for t in self.expected_types]
1430         )
1431
1432     def __repr__(self):
1433         return "%s(%s)" % (self.__class__.__name__, self)
1434
1435
1436 class BoundsError(ASN1Error):
1437     def __init__(self, bound_min, value, bound_max):
1438         super(BoundsError, self).__init__()
1439         self.bound_min = bound_min
1440         self.value = value
1441         self.bound_max = bound_max
1442
1443     def __str__(self):
1444         return "unsatisfied bounds: %s <= %s <= %s" % (
1445             self.bound_min,
1446             self.value,
1447             self.bound_max,
1448         )
1449
1450     def __repr__(self):
1451         return "%s(%s)" % (self.__class__.__name__, self)
1452
1453
1454 ########################################################################
1455 # Basic coders
1456 ########################################################################
1457
1458 _hexdecoder = getdecoder("hex")
1459 _hexencoder = getencoder("hex")
1460
1461
1462 def hexdec(data):
1463     """Binary data to hexadecimal string convert
1464     """
1465     return _hexdecoder(data)[0]
1466
1467
1468 def hexenc(data):
1469     """Hexadecimal string to binary data convert
1470     """
1471     return _hexencoder(data)[0].decode("ascii")
1472
1473
1474 def int_bytes_len(num, byte_len=8):
1475     if num == 0:
1476         return 1
1477     return int(ceil(float(num.bit_length()) / byte_len))
1478
1479
1480 def zero_ended_encode(num):
1481     octets = bytearray(int_bytes_len(num, 7))
1482     i = len(octets) - 1
1483     octets[i] = num & 0x7F
1484     num >>= 7
1485     i -= 1
1486     while num > 0:
1487         octets[i] = 0x80 | (num & 0x7F)
1488         num >>= 7
1489         i -= 1
1490     return bytes(octets)
1491
1492
1493 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1494     """Encode tag to binary form
1495
1496     :param int num: tag's number
1497     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1498                       :py:data:`pyderasn.TagClassContext`,
1499                       :py:data:`pyderasn.TagClassApplication`,
1500                       :py:data:`pyderasn.TagClassPrivate`)
1501     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1502                      :py:data:`pyderasn.TagFormConstructed`)
1503     """
1504     if num < 31:
1505         # [XX|X|.....]
1506         return int2byte(klass | form | num)
1507     # [XX|X|11111][1.......][1.......] ... [0.......]
1508     return int2byte(klass | form | 31) + zero_ended_encode(num)
1509
1510
1511 def tag_decode(tag):
1512     """Decode tag from binary form
1513
1514     .. warning::
1515
1516        No validation is performed, assuming that it has already passed.
1517
1518     It returns tuple with three integers, as
1519     :py:func:`pyderasn.tag_encode` accepts.
1520     """
1521     first_octet = byte2int(tag)
1522     klass = first_octet & 0xC0
1523     form = first_octet & 0x20
1524     if first_octet & 0x1F < 0x1F:
1525         return (klass, form, first_octet & 0x1F)
1526     num = 0
1527     for octet in iterbytes(tag[1:]):
1528         num <<= 7
1529         num |= octet & 0x7F
1530     return (klass, form, num)
1531
1532
1533 def tag_ctxp(num):
1534     """Create CONTEXT PRIMITIVE tag
1535     """
1536     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1537
1538
1539 def tag_ctxc(num):
1540     """Create CONTEXT CONSTRUCTED tag
1541     """
1542     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1543
1544
1545 def tag_strip(data):
1546     """Take off tag from the data
1547
1548     :returns: (encoded tag, tag length, remaining data)
1549     """
1550     if len(data) == 0:
1551         raise NotEnoughData("no data at all")
1552     if byte2int(data) & 0x1F < 31:
1553         return data[:1], 1, data[1:]
1554     i = 0
1555     while True:
1556         i += 1
1557         if i == len(data):
1558             raise DecodeError("unfinished tag")
1559         if indexbytes(data, i) & 0x80 == 0:
1560             break
1561     i += 1
1562     return data[:i], i, data[i:]
1563
1564
1565 def len_encode(l):
1566     if l < 0x80:
1567         return int2byte(l)
1568     octets = bytearray(int_bytes_len(l) + 1)
1569     octets[0] = 0x80 | (len(octets) - 1)
1570     for i in six_xrange(len(octets) - 1, 0, -1):
1571         octets[i] = l & 0xFF
1572         l >>= 8
1573     return bytes(octets)
1574
1575
1576 def len_decode(data):
1577     """Decode length
1578
1579     :returns: (decoded length, length's length, remaining data)
1580     :raises LenIndefForm: if indefinite form encoding is met
1581     """
1582     if len(data) == 0:
1583         raise NotEnoughData("no data at all")
1584     first_octet = byte2int(data)
1585     if first_octet & 0x80 == 0:
1586         return first_octet, 1, data[1:]
1587     octets_num = first_octet & 0x7F
1588     if octets_num + 1 > len(data):
1589         raise NotEnoughData("encoded length is longer than data")
1590     if octets_num == 0:
1591         raise LenIndefForm()
1592     if byte2int(data[1:]) == 0:
1593         raise DecodeError("leading zeros")
1594     l = 0
1595     for v in iterbytes(data[1:1 + octets_num]):
1596         l = (l << 8) | v
1597     if l <= 127:
1598         raise DecodeError("long form instead of short one")
1599     return l, 1 + octets_num, data[1 + octets_num:]
1600
1601
1602 LEN0 = len_encode(0)
1603 LEN1 = len_encode(1)
1604 LEN1K = len_encode(1000)
1605
1606
1607 def len_size(l):
1608     """How many bytes length field will take
1609     """
1610     if l < 128:
1611         return 1
1612     if l < 256:  # 1 << 8
1613         return 2
1614     if l < 65536:  # 1 << 16
1615         return 3
1616     if l < 16777216:  # 1 << 24
1617         return 4
1618     if l < 4294967296:  # 1 << 32
1619         return 5
1620     if l < 1099511627776:  # 1 << 40
1621         return 6
1622     if l < 281474976710656:  # 1 << 48
1623         return 7
1624     if l < 72057594037927936:  # 1 << 56
1625         return 8
1626     raise OverflowError("too big length")
1627
1628
1629 def write_full(writer, data):
1630     """Fully write provided data
1631
1632     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1633
1634     BytesIO does not guarantee that the whole data will be written at
1635     once. That function write everything provided, raising an error if
1636     ``writer`` returns None.
1637     """
1638     data = memoryview(data)
1639     written = 0
1640     while written != len(data):
1641         n = writer(data[written:])
1642         if n is None:
1643             raise ValueError("can not write to buf")
1644         written += n
1645
1646
1647 # If it is 64-bit system, then use compact 64-bit array of unsigned
1648 # longs. Use an ordinary list with universal integers otherwise, that
1649 # is slower.
1650 if sys_maxsize > 2 ** 32:
1651     def state_2pass_new():
1652         return array("L")
1653 else:
1654     def state_2pass_new():
1655         return []
1656
1657
1658 ########################################################################
1659 # Base class
1660 ########################################################################
1661
1662 class AutoAddSlots(type):
1663     def __new__(cls, name, bases, _dict):
1664         _dict["__slots__"] = _dict.get("__slots__", ())
1665         return type.__new__(cls, name, bases, _dict)
1666
1667
1668 BasicState = namedtuple("BasicState", (
1669     "version",
1670     "tag",
1671     "tag_order",
1672     "expl",
1673     "default",
1674     "optional",
1675     "offset",
1676     "llen",
1677     "vlen",
1678     "expl_lenindef",
1679     "lenindef",
1680     "ber_encoded",
1681 ), **NAMEDTUPLE_KWARGS)
1682
1683
1684 @add_metaclass(AutoAddSlots)
1685 class Obj(object):
1686     """Common ASN.1 object class
1687
1688     All ASN.1 types are inherited from it. It has metaclass that
1689     automatically adds ``__slots__`` to all inherited classes.
1690     """
1691     __slots__ = (
1692         "tag",
1693         "_tag_order",
1694         "_value",
1695         "_expl",
1696         "default",
1697         "optional",
1698         "offset",
1699         "llen",
1700         "vlen",
1701         "expl_lenindef",
1702         "lenindef",
1703         "ber_encoded",
1704     )
1705
1706     def __init__(
1707             self,
1708             impl=None,
1709             expl=None,
1710             default=None,
1711             optional=False,
1712             _decoded=(0, 0, 0),
1713     ):
1714         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1715         self._expl = getattr(self, "expl", None) if expl is None else expl
1716         if self.tag != self.tag_default and self._expl is not None:
1717             raise ValueError("implicit and explicit tags can not be set simultaneously")
1718         if self.tag is None:
1719             self._tag_order = None
1720         else:
1721             tag_class, _, tag_num = tag_decode(
1722                 self.tag if self._expl is None else self._expl
1723             )
1724             self._tag_order = (tag_class, tag_num)
1725         if default is not None:
1726             optional = True
1727         self.optional = optional
1728         self.offset, self.llen, self.vlen = _decoded
1729         self.default = None
1730         self.expl_lenindef = False
1731         self.lenindef = False
1732         self.ber_encoded = False
1733
1734     @property
1735     def ready(self):  # pragma: no cover
1736         """Is object ready to be encoded?
1737         """
1738         raise NotImplementedError()
1739
1740     def _assert_ready(self):
1741         if not self.ready:
1742             raise ObjNotReady(self.__class__.__name__)
1743
1744     @property
1745     def bered(self):
1746         """Is either object or any elements inside is BER encoded?
1747         """
1748         return self.expl_lenindef or self.lenindef or self.ber_encoded
1749
1750     @property
1751     def decoded(self):
1752         """Is object decoded?
1753         """
1754         return (self.llen + self.vlen) > 0
1755
1756     def __getstate__(self):  # pragma: no cover
1757         """Used for making safe to be mutable pickleable copies
1758         """
1759         raise NotImplementedError()
1760
1761     def __setstate__(self, state):
1762         if state.version != __version__:
1763             raise ValueError("data is pickled by different PyDERASN version")
1764         self.tag = state.tag
1765         self._tag_order = state.tag_order
1766         self._expl = state.expl
1767         self.default = state.default
1768         self.optional = state.optional
1769         self.offset = state.offset
1770         self.llen = state.llen
1771         self.vlen = state.vlen
1772         self.expl_lenindef = state.expl_lenindef
1773         self.lenindef = state.lenindef
1774         self.ber_encoded = state.ber_encoded
1775
1776     @property
1777     def tag_order(self):
1778         """Tag's (class, number) used for DER/CER sorting
1779         """
1780         return self._tag_order
1781
1782     @property
1783     def tag_order_cer(self):
1784         return self.tag_order
1785
1786     @property
1787     def tlen(self):
1788         """.. seealso:: :ref:`decoding`
1789         """
1790         return len(self.tag)
1791
1792     @property
1793     def tlvlen(self):
1794         """.. seealso:: :ref:`decoding`
1795         """
1796         return self.tlen + self.llen + self.vlen
1797
1798     def __str__(self):  # pragma: no cover
1799         return self.__bytes__() if PY2 else self.__unicode__()
1800
1801     def __ne__(self, their):
1802         return not(self == their)
1803
1804     def __gt__(self, their):  # pragma: no cover
1805         return not(self < their)
1806
1807     def __le__(self, their):  # pragma: no cover
1808         return (self == their) or (self < their)
1809
1810     def __ge__(self, their):  # pragma: no cover
1811         return (self == their) or (self > their)
1812
1813     def _encode(self):  # pragma: no cover
1814         raise NotImplementedError()
1815
1816     def _encode_cer(self, writer):
1817         write_full(writer, self._encode())
1818
1819     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1820         yield NotImplemented
1821
1822     def _encode1st(self, state):
1823         raise NotImplementedError()
1824
1825     def _encode2nd(self, writer, state_iter):
1826         raise NotImplementedError()
1827
1828     def encode(self):
1829         """DER encode the structure
1830
1831         :returns: DER representation
1832         """
1833         raw = self._encode()
1834         if self._expl is None:
1835             return raw
1836         return b"".join((self._expl, len_encode(len(raw)), raw))
1837
1838     def encode1st(self, state=None):
1839         """Do the 1st pass of 2-pass encoding
1840
1841         :rtype: (int, array("L"))
1842         :returns: full length of encoded data and precalculated various
1843                   objects lengths
1844         """
1845         if state is None:
1846             state = state_2pass_new()
1847         if self._expl is None:
1848             return self._encode1st(state)
1849         state.append(0)
1850         idx = len(state) - 1
1851         vlen, _ = self._encode1st(state)
1852         state[idx] = vlen
1853         fulllen = len(self._expl) + len_size(vlen) + vlen
1854         return fulllen, state
1855
1856     def encode2nd(self, writer, state_iter):
1857         """Do the 2nd pass of 2-pass encoding
1858
1859         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1860         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1861         """
1862         if self._expl is None:
1863             self._encode2nd(writer, state_iter)
1864         else:
1865             write_full(writer, self._expl + len_encode(next(state_iter)))
1866             self._encode2nd(writer, state_iter)
1867
1868     def encode_cer(self, writer):
1869         """CER encode the structure to specified writer
1870
1871         :param writer: must comply with ``io.RawIOBase.write``
1872                        behaviour. It takes slice to be written and
1873                        returns number of bytes processed. If it returns
1874                        None, then exception will be raised
1875         """
1876         if self._expl is not None:
1877             write_full(writer, self._expl + LENINDEF)
1878         if getattr(self, "der_forced", False):
1879             write_full(writer, self._encode())
1880         else:
1881             self._encode_cer(writer)
1882         if self._expl is not None:
1883             write_full(writer, EOC)
1884
1885     def hexencode(self):
1886         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1887         """
1888         return hexenc(self.encode())
1889
1890     def decode(
1891             self,
1892             data,
1893             offset=0,
1894             leavemm=False,
1895             decode_path=(),
1896             ctx=None,
1897             tag_only=False,
1898             _ctx_immutable=True,
1899     ):
1900         """Decode the data
1901
1902         :param data: either binary or memoryview
1903         :param int offset: initial data's offset
1904         :param bool leavemm: do we need to leave memoryview of remaining
1905                     data as is, or convert it to bytes otherwise
1906         :param decode_path: current decode path (tuples of strings,
1907                             possibly with DecodePathDefBy) with will be
1908                             the root for all underlying objects
1909         :param ctx: optional :ref:`context <ctx>` governing decoding process
1910         :param bool tag_only: decode only the tag, without length and
1911                               contents (used only in Choice and Set
1912                               structures, trying to determine if tag satisfies
1913                               the schema)
1914         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1915                                     before using it?
1916         :returns: (Obj, remaining data)
1917
1918         .. seealso:: :ref:`decoding`
1919         """
1920         result = next(self.decode_evgen(
1921             data,
1922             offset,
1923             leavemm,
1924             decode_path,
1925             ctx,
1926             tag_only,
1927             _ctx_immutable,
1928             _evgen_mode=False,
1929         ))
1930         if result is None:
1931             return None
1932         _, obj, tail = result
1933         return obj, tail
1934
1935     def decode_evgen(
1936             self,
1937             data,
1938             offset=0,
1939             leavemm=False,
1940             decode_path=(),
1941             ctx=None,
1942             tag_only=False,
1943             _ctx_immutable=True,
1944             _evgen_mode=True,
1945     ):
1946         """Decode with evgen mode on
1947
1948         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1949         it returns the generator producing ``(decode_path, obj, tail)``
1950         values.
1951         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1952         """
1953         if ctx is None:
1954             ctx = {}
1955         elif _ctx_immutable:
1956             ctx = copy(ctx)
1957         tlv = memoryview(data)
1958         if (
1959                 _evgen_mode and
1960                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1961         ):
1962             _evgen_mode = False
1963         if self._expl is None:
1964             for result in self._decode(
1965                     tlv,
1966                     offset=offset,
1967                     decode_path=decode_path,
1968                     ctx=ctx,
1969                     tag_only=tag_only,
1970                     evgen_mode=_evgen_mode,
1971             ):
1972                 if tag_only:
1973                     yield None
1974                     return
1975                 _decode_path, obj, tail = result
1976                 if _decode_path is not decode_path:
1977                     yield result
1978         else:
1979             try:
1980                 t, tlen, lv = tag_strip(tlv)
1981             except DecodeError as err:
1982                 raise err.__class__(
1983                     msg=err.msg,
1984                     klass=self.__class__,
1985                     decode_path=decode_path,
1986                     offset=offset,
1987                 )
1988             if t != self._expl:
1989                 raise TagMismatch(
1990                     klass=self.__class__,
1991                     decode_path=decode_path,
1992                     offset=offset,
1993                 )
1994             try:
1995                 l, llen, v = len_decode(lv)
1996             except LenIndefForm as err:
1997                 if not ctx.get("bered", False):
1998                     raise err.__class__(
1999                         msg=err.msg,
2000                         klass=self.__class__,
2001                         decode_path=decode_path,
2002                         offset=offset,
2003                     )
2004                 llen, v = 1, lv[1:]
2005                 offset += tlen + llen
2006                 for result in self._decode(
2007                         v,
2008                         offset=offset,
2009                         decode_path=decode_path,
2010                         ctx=ctx,
2011                         tag_only=tag_only,
2012                         evgen_mode=_evgen_mode,
2013                 ):
2014                     if tag_only:  # pragma: no cover
2015                         yield None
2016                         return
2017                     _decode_path, obj, tail = result
2018                     if _decode_path is not decode_path:
2019                         yield result
2020                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2021                 if eoc_expected.tobytes() != EOC:
2022                     raise DecodeError(
2023                         "no EOC",
2024                         klass=self.__class__,
2025                         decode_path=decode_path,
2026                         offset=offset,
2027                     )
2028                 obj.vlen += EOC_LEN
2029                 obj.expl_lenindef = True
2030             except DecodeError as err:
2031                 raise err.__class__(
2032                     msg=err.msg,
2033                     klass=self.__class__,
2034                     decode_path=decode_path,
2035                     offset=offset,
2036                 )
2037             else:
2038                 if l > len(v):
2039                     raise NotEnoughData(
2040                         "encoded length is longer than data",
2041                         klass=self.__class__,
2042                         decode_path=decode_path,
2043                         offset=offset,
2044                     )
2045                 for result in self._decode(
2046                         v,
2047                         offset=offset + tlen + llen,
2048                         decode_path=decode_path,
2049                         ctx=ctx,
2050                         tag_only=tag_only,
2051                         evgen_mode=_evgen_mode,
2052                 ):
2053                     if tag_only:  # pragma: no cover
2054                         yield None
2055                         return
2056                     _decode_path, obj, tail = result
2057                     if _decode_path is not decode_path:
2058                         yield result
2059                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2060                     raise DecodeError(
2061                         "explicit tag out-of-bound, longer than data",
2062                         klass=self.__class__,
2063                         decode_path=decode_path,
2064                         offset=offset,
2065                     )
2066         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2067
2068     def decod(self, data, offset=0, decode_path=(), ctx=None):
2069         """Decode the data, check that tail is empty
2070
2071         :raises ExceedingData: if tail is not empty
2072
2073         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2074         (decode without tail) that also checks that there is no
2075         trailing data left.
2076         """
2077         obj, tail = self.decode(
2078             data,
2079             offset=offset,
2080             decode_path=decode_path,
2081             ctx=ctx,
2082             leavemm=True,
2083         )
2084         if len(tail) > 0:
2085             raise ExceedingData(len(tail))
2086         return obj
2087
2088     def hexdecode(self, data, *args, **kwargs):
2089         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2090         """
2091         return self.decode(hexdec(data), *args, **kwargs)
2092
2093     def hexdecod(self, data, *args, **kwargs):
2094         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2095         """
2096         return self.decod(hexdec(data), *args, **kwargs)
2097
2098     @property
2099     def expled(self):
2100         """.. seealso:: :ref:`decoding`
2101         """
2102         return self._expl is not None
2103
2104     @property
2105     def expl_tag(self):
2106         """.. seealso:: :ref:`decoding`
2107         """
2108         return self._expl
2109
2110     @property
2111     def expl_tlen(self):
2112         """.. seealso:: :ref:`decoding`
2113         """
2114         return len(self._expl)
2115
2116     @property
2117     def expl_llen(self):
2118         """.. seealso:: :ref:`decoding`
2119         """
2120         if self.expl_lenindef:
2121             return 1
2122         return len(len_encode(self.tlvlen))
2123
2124     @property
2125     def expl_offset(self):
2126         """.. seealso:: :ref:`decoding`
2127         """
2128         return self.offset - self.expl_tlen - self.expl_llen
2129
2130     @property
2131     def expl_vlen(self):
2132         """.. seealso:: :ref:`decoding`
2133         """
2134         return self.tlvlen
2135
2136     @property
2137     def expl_tlvlen(self):
2138         """.. seealso:: :ref:`decoding`
2139         """
2140         return self.expl_tlen + self.expl_llen + self.expl_vlen
2141
2142     @property
2143     def fulloffset(self):
2144         """.. seealso:: :ref:`decoding`
2145         """
2146         return self.expl_offset if self.expled else self.offset
2147
2148     @property
2149     def fulllen(self):
2150         """.. seealso:: :ref:`decoding`
2151         """
2152         return self.expl_tlvlen if self.expled else self.tlvlen
2153
2154     def pps_lenindef(self, decode_path):
2155         if self.lenindef and not (
2156                 getattr(self, "defined", None) is not None and
2157                 self.defined[1].lenindef
2158         ):
2159             yield _pp(
2160                 asn1_type_name="EOC",
2161                 obj_name="",
2162                 decode_path=decode_path,
2163                 offset=(
2164                     self.offset + self.tlvlen -
2165                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2166                 ),
2167                 tlen=1,
2168                 llen=1,
2169                 vlen=0,
2170                 ber_encoded=True,
2171                 bered=True,
2172             )
2173         if self.expl_lenindef:
2174             yield _pp(
2175                 asn1_type_name="EOC",
2176                 obj_name="EXPLICIT",
2177                 decode_path=decode_path,
2178                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2179                 tlen=1,
2180                 llen=1,
2181                 vlen=0,
2182                 ber_encoded=True,
2183                 bered=True,
2184             )
2185
2186
2187 def encode_cer(obj):
2188     """Encode to CER in memory buffer
2189
2190     :returns bytes: memory buffer contents
2191     """
2192     buf = BytesIO()
2193     obj.encode_cer(buf.write)
2194     return buf.getvalue()
2195
2196
2197 def encode2pass(obj):
2198     """Encode (2-pass mode) to DER in memory buffer
2199
2200     :returns bytes: memory buffer contents
2201     """
2202     buf = BytesIO()
2203     _, state = obj.encode1st()
2204     obj.encode2nd(buf.write, iter(state))
2205     return buf.getvalue()
2206
2207
2208 class DecodePathDefBy(object):
2209     """DEFINED BY representation inside decode path
2210     """
2211     __slots__ = ("defined_by",)
2212
2213     def __init__(self, defined_by):
2214         self.defined_by = defined_by
2215
2216     def __ne__(self, their):
2217         return not(self == their)
2218
2219     def __eq__(self, their):
2220         if not isinstance(their, self.__class__):
2221             return False
2222         return self.defined_by == their.defined_by
2223
2224     def __str__(self):
2225         return "DEFINED BY " + str(self.defined_by)
2226
2227     def __repr__(self):
2228         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2229
2230
2231 ########################################################################
2232 # Pretty printing
2233 ########################################################################
2234
2235 PP = namedtuple("PP", (
2236     "obj",
2237     "asn1_type_name",
2238     "obj_name",
2239     "decode_path",
2240     "value",
2241     "blob",
2242     "optional",
2243     "default",
2244     "impl",
2245     "expl",
2246     "offset",
2247     "tlen",
2248     "llen",
2249     "vlen",
2250     "expl_offset",
2251     "expl_tlen",
2252     "expl_llen",
2253     "expl_vlen",
2254     "expl_lenindef",
2255     "lenindef",
2256     "ber_encoded",
2257     "bered",
2258 ), **NAMEDTUPLE_KWARGS)
2259
2260
2261 def _pp(
2262         obj=None,
2263         asn1_type_name="unknown",
2264         obj_name="unknown",
2265         decode_path=(),
2266         value=None,
2267         blob=None,
2268         optional=False,
2269         default=False,
2270         impl=None,
2271         expl=None,
2272         offset=0,
2273         tlen=0,
2274         llen=0,
2275         vlen=0,
2276         expl_offset=None,
2277         expl_tlen=None,
2278         expl_llen=None,
2279         expl_vlen=None,
2280         expl_lenindef=False,
2281         lenindef=False,
2282         ber_encoded=False,
2283         bered=False,
2284 ):
2285     return PP(
2286         obj,
2287         asn1_type_name,
2288         obj_name,
2289         decode_path,
2290         value,
2291         blob,
2292         optional,
2293         default,
2294         impl,
2295         expl,
2296         offset,
2297         tlen,
2298         llen,
2299         vlen,
2300         expl_offset,
2301         expl_tlen,
2302         expl_llen,
2303         expl_vlen,
2304         expl_lenindef,
2305         lenindef,
2306         ber_encoded,
2307         bered,
2308     )
2309
2310
2311 def _colourize(what, colour, with_colours, attrs=("bold",)):
2312     return colored(what, colour, attrs=attrs) if with_colours else what
2313
2314
2315 def colonize_hex(hexed):
2316     """Separate hexadecimal string with colons
2317     """
2318     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2319
2320
2321 def find_oid_name(asn1_type_name, oid_maps, value):
2322     if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2323         for oid_map in oid_maps:
2324             oid_name = oid_map.get(value)
2325             if oid_name is not None:
2326                 return oid_name
2327     return None
2328
2329
2330 def pp_console_row(
2331         pp,
2332         oid_maps=(),
2333         with_offsets=False,
2334         with_blob=True,
2335         with_colours=False,
2336         with_decode_path=False,
2337         decode_path_len_decrease=0,
2338 ):
2339     cols = []
2340     if with_offsets:
2341         col = "%5d%s%s" % (
2342             pp.offset,
2343             (
2344                 "  " if pp.expl_offset is None else
2345                 ("-%d" % (pp.offset - pp.expl_offset))
2346             ),
2347             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2348         )
2349         col = _colourize(col, "red", with_colours, ())
2350         col += _colourize("B", "red", with_colours) if pp.bered else " "
2351         cols.append(col)
2352         col = "[%d,%d,%4d]%s" % (
2353             pp.tlen, pp.llen, pp.vlen,
2354             LENINDEF_PP_CHAR if pp.lenindef else " "
2355         )
2356         col = _colourize(col, "green", with_colours, ())
2357         cols.append(col)
2358     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2359     if decode_path_len > 0:
2360         cols.append(" ." * decode_path_len)
2361         ent = pp.decode_path[-1]
2362         if isinstance(ent, DecodePathDefBy):
2363             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2364             value = str(ent.defined_by)
2365             oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2366             if oid_name is None:
2367                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2368             else:
2369                 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2370         else:
2371             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2372     if pp.expl is not None:
2373         klass, _, num = pp.expl
2374         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2375         cols.append(_colourize(col, "blue", with_colours))
2376     if pp.impl is not None:
2377         klass, _, num = pp.impl
2378         col = "[%s%d]" % (TagClassReprs[klass], num)
2379         cols.append(_colourize(col, "blue", with_colours))
2380     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2381         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2382     if pp.ber_encoded:
2383         cols.append(_colourize("BER", "red", with_colours))
2384     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2385     if pp.value is not None:
2386         value = pp.value
2387         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2388         oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2389         if oid_name is not None:
2390             cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2391         if pp.asn1_type_name == Integer.asn1_type_name:
2392             cols.append(_colourize(
2393                 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2394             ))
2395     if with_blob:
2396         if pp.blob.__class__ == binary_type:
2397             cols.append(hexenc(pp.blob))
2398         elif pp.blob.__class__ == tuple:
2399             cols.append(", ".join(pp.blob))
2400     if pp.optional:
2401         cols.append(_colourize("OPTIONAL", "red", with_colours))
2402     if pp.default:
2403         cols.append(_colourize("DEFAULT", "red", with_colours))
2404     if with_decode_path:
2405         cols.append(_colourize(
2406             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2407             "grey",
2408             with_colours,
2409         ))
2410     return " ".join(cols)
2411
2412
2413 def pp_console_blob(pp, decode_path_len_decrease=0):
2414     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2415     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2416     if decode_path_len > 0:
2417         cols.append(" ." * (decode_path_len + 1))
2418     if pp.blob.__class__ == binary_type:
2419         blob = hexenc(pp.blob).upper()
2420         for i in six_xrange(0, len(blob), 32):
2421             chunk = blob[i:i + 32]
2422             yield " ".join(cols + [colonize_hex(chunk)])
2423     elif pp.blob.__class__ == tuple:
2424         yield " ".join(cols + [", ".join(pp.blob)])
2425
2426
2427 def pprint(
2428         obj,
2429         oid_maps=(),
2430         big_blobs=False,
2431         with_colours=False,
2432         with_decode_path=False,
2433         decode_path_only=(),
2434         decode_path=(),
2435 ):
2436     """Pretty print object
2437
2438     :param Obj obj: object you want to pretty print
2439     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2440                      Its human readable form is printed when OID is met
2441     :param big_blobs: if large binary objects are met (like OctetString
2442                       values), do we need to print them too, on separate
2443                       lines
2444     :param with_colours: colourize output, if ``termcolor`` library
2445                          is available
2446     :param with_decode_path: print decode path
2447     :param decode_path_only: print only that specified decode path
2448     """
2449     def _pprint_pps(pps):
2450         for pp in pps:
2451             if hasattr(pp, "_fields"):
2452                 if (
2453                         decode_path_only != () and
2454                         tuple(
2455                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2456                         ) != decode_path_only
2457                 ):
2458                     continue
2459                 if big_blobs:
2460                     yield pp_console_row(
2461                         pp,
2462                         oid_maps=oid_maps,
2463                         with_offsets=True,
2464                         with_blob=False,
2465                         with_colours=with_colours,
2466                         with_decode_path=with_decode_path,
2467                         decode_path_len_decrease=len(decode_path_only),
2468                     )
2469                     for row in pp_console_blob(
2470                             pp,
2471                             decode_path_len_decrease=len(decode_path_only),
2472                     ):
2473                         yield row
2474                 else:
2475                     yield pp_console_row(
2476                         pp,
2477                         oid_maps=oid_maps,
2478                         with_offsets=True,
2479                         with_blob=True,
2480                         with_colours=with_colours,
2481                         with_decode_path=with_decode_path,
2482                         decode_path_len_decrease=len(decode_path_only),
2483                     )
2484             else:
2485                 for row in _pprint_pps(pp):
2486                     yield row
2487     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2488
2489
2490 ########################################################################
2491 # ASN.1 primitive types
2492 ########################################################################
2493
2494 BooleanState = namedtuple(
2495     "BooleanState",
2496     BasicState._fields + ("value",),
2497     **NAMEDTUPLE_KWARGS
2498 )
2499
2500
2501 class Boolean(Obj):
2502     """``BOOLEAN`` boolean type
2503
2504     >>> b = Boolean(True)
2505     BOOLEAN True
2506     >>> b == Boolean(True)
2507     True
2508     >>> bool(b)
2509     True
2510     """
2511     __slots__ = ()
2512     tag_default = tag_encode(1)
2513     asn1_type_name = "BOOLEAN"
2514
2515     def __init__(
2516             self,
2517             value=None,
2518             impl=None,
2519             expl=None,
2520             default=None,
2521             optional=False,
2522             _decoded=(0, 0, 0),
2523     ):
2524         """
2525         :param value: set the value. Either boolean type, or
2526                       :py:class:`pyderasn.Boolean` object
2527         :param bytes impl: override default tag with ``IMPLICIT`` one
2528         :param bytes expl: override default tag with ``EXPLICIT`` one
2529         :param default: set default value. Type same as in ``value``
2530         :param bool optional: is object ``OPTIONAL`` in sequence
2531         """
2532         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2533         self._value = None if value is None else self._value_sanitize(value)
2534         if default is not None:
2535             default = self._value_sanitize(default)
2536             self.default = self.__class__(
2537                 value=default,
2538                 impl=self.tag,
2539                 expl=self._expl,
2540             )
2541             if value is None:
2542                 self._value = default
2543
2544     def _value_sanitize(self, value):
2545         if value.__class__ == bool:
2546             return value
2547         if issubclass(value.__class__, Boolean):
2548             return value._value
2549         raise InvalidValueType((self.__class__, bool))
2550
2551     @property
2552     def ready(self):
2553         return self._value is not None
2554
2555     def __getstate__(self):
2556         return BooleanState(
2557             __version__,
2558             self.tag,
2559             self._tag_order,
2560             self._expl,
2561             self.default,
2562             self.optional,
2563             self.offset,
2564             self.llen,
2565             self.vlen,
2566             self.expl_lenindef,
2567             self.lenindef,
2568             self.ber_encoded,
2569             self._value,
2570         )
2571
2572     def __setstate__(self, state):
2573         super(Boolean, self).__setstate__(state)
2574         self._value = state.value
2575
2576     def __nonzero__(self):
2577         self._assert_ready()
2578         return self._value
2579
2580     def __bool__(self):
2581         self._assert_ready()
2582         return self._value
2583
2584     def __eq__(self, their):
2585         if their.__class__ == bool:
2586             return self._value == their
2587         if not issubclass(their.__class__, Boolean):
2588             return False
2589         return (
2590             self._value == their._value and
2591             self.tag == their.tag and
2592             self._expl == their._expl
2593         )
2594
2595     def __call__(
2596             self,
2597             value=None,
2598             impl=None,
2599             expl=None,
2600             default=None,
2601             optional=None,
2602     ):
2603         return self.__class__(
2604             value=value,
2605             impl=self.tag if impl is None else impl,
2606             expl=self._expl if expl is None else expl,
2607             default=self.default if default is None else default,
2608             optional=self.optional if optional is None else optional,
2609         )
2610
2611     def _encode(self):
2612         self._assert_ready()
2613         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2614
2615     def _encode1st(self, state):
2616         return len(self.tag) + 2, state
2617
2618     def _encode2nd(self, writer, state_iter):
2619         self._assert_ready()
2620         write_full(writer, self._encode())
2621
2622     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2623         try:
2624             t, _, lv = tag_strip(tlv)
2625         except DecodeError as err:
2626             raise err.__class__(
2627                 msg=err.msg,
2628                 klass=self.__class__,
2629                 decode_path=decode_path,
2630                 offset=offset,
2631             )
2632         if t != self.tag:
2633             raise TagMismatch(
2634                 klass=self.__class__,
2635                 decode_path=decode_path,
2636                 offset=offset,
2637             )
2638         if tag_only:
2639             yield None
2640             return
2641         try:
2642             l, _, v = len_decode(lv)
2643         except DecodeError as err:
2644             raise err.__class__(
2645                 msg=err.msg,
2646                 klass=self.__class__,
2647                 decode_path=decode_path,
2648                 offset=offset,
2649             )
2650         if l != 1:
2651             raise InvalidLength(
2652                 "Boolean's length must be equal to 1",
2653                 klass=self.__class__,
2654                 decode_path=decode_path,
2655                 offset=offset,
2656             )
2657         if l > len(v):
2658             raise NotEnoughData(
2659                 "encoded length is longer than data",
2660                 klass=self.__class__,
2661                 decode_path=decode_path,
2662                 offset=offset,
2663             )
2664         first_octet = byte2int(v)
2665         ber_encoded = False
2666         if first_octet == 0:
2667             value = False
2668         elif first_octet == 0xFF:
2669             value = True
2670         elif ctx.get("bered", False):
2671             value = True
2672             ber_encoded = True
2673         else:
2674             raise DecodeError(
2675                 "unacceptable Boolean value",
2676                 klass=self.__class__,
2677                 decode_path=decode_path,
2678                 offset=offset,
2679             )
2680         obj = self.__class__(
2681             value=value,
2682             impl=self.tag,
2683             expl=self._expl,
2684             default=self.default,
2685             optional=self.optional,
2686             _decoded=(offset, 1, 1),
2687         )
2688         obj.ber_encoded = ber_encoded
2689         yield decode_path, obj, v[1:]
2690
2691     def __repr__(self):
2692         return pp_console_row(next(self.pps()))
2693
2694     def pps(self, decode_path=()):
2695         yield _pp(
2696             obj=self,
2697             asn1_type_name=self.asn1_type_name,
2698             obj_name=self.__class__.__name__,
2699             decode_path=decode_path,
2700             value=str(self._value) if self.ready else None,
2701             optional=self.optional,
2702             default=self == self.default,
2703             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2704             expl=None if self._expl is None else tag_decode(self._expl),
2705             offset=self.offset,
2706             tlen=self.tlen,
2707             llen=self.llen,
2708             vlen=self.vlen,
2709             expl_offset=self.expl_offset if self.expled else None,
2710             expl_tlen=self.expl_tlen if self.expled else None,
2711             expl_llen=self.expl_llen if self.expled else None,
2712             expl_vlen=self.expl_vlen if self.expled else None,
2713             expl_lenindef=self.expl_lenindef,
2714             ber_encoded=self.ber_encoded,
2715             bered=self.bered,
2716         )
2717         for pp in self.pps_lenindef(decode_path):
2718             yield pp
2719
2720
2721 IntegerState = namedtuple(
2722     "IntegerState",
2723     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2724     **NAMEDTUPLE_KWARGS
2725 )
2726
2727
2728 class Integer(Obj):
2729     """``INTEGER`` integer type
2730
2731     >>> b = Integer(-123)
2732     INTEGER -123
2733     >>> b == Integer(-123)
2734     True
2735     >>> int(b)
2736     -123
2737
2738     >>> Integer(2, bounds=(1, 3))
2739     INTEGER 2
2740     >>> Integer(5, bounds=(1, 3))
2741     Traceback (most recent call last):
2742     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2743
2744     ::
2745
2746         class Version(Integer):
2747             schema = (
2748                 ("v1", 0),
2749                 ("v2", 1),
2750                 ("v3", 2),
2751             )
2752
2753     >>> v = Version("v1")
2754     Version INTEGER v1
2755     >>> int(v)
2756     0
2757     >>> v.named
2758     'v1'
2759     >>> v.specs
2760     {'v3': 2, 'v1': 0, 'v2': 1}
2761     """
2762     __slots__ = ("specs", "_bound_min", "_bound_max")
2763     tag_default = tag_encode(2)
2764     asn1_type_name = "INTEGER"
2765
2766     def __init__(
2767             self,
2768             value=None,
2769             bounds=None,
2770             impl=None,
2771             expl=None,
2772             default=None,
2773             optional=False,
2774             _specs=None,
2775             _decoded=(0, 0, 0),
2776     ):
2777         """
2778         :param value: set the value. Either integer type, named value
2779                       (if ``schema`` is specified in the class), or
2780                       :py:class:`pyderasn.Integer` object
2781         :param bounds: set ``(MIN, MAX)`` value constraint.
2782                        (-inf, +inf) by default
2783         :param bytes impl: override default tag with ``IMPLICIT`` one
2784         :param bytes expl: override default tag with ``EXPLICIT`` one
2785         :param default: set default value. Type same as in ``value``
2786         :param bool optional: is object ``OPTIONAL`` in sequence
2787         """
2788         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2789         self._value = value
2790         specs = getattr(self, "schema", {}) if _specs is None else _specs
2791         self.specs = specs if specs.__class__ == dict else dict(specs)
2792         self._bound_min, self._bound_max = getattr(
2793             self,
2794             "bounds",
2795             (float("-inf"), float("+inf")),
2796         ) if bounds is None else bounds
2797         if value is not None:
2798             self._value = self._value_sanitize(value)
2799         if default is not None:
2800             default = self._value_sanitize(default)
2801             self.default = self.__class__(
2802                 value=default,
2803                 impl=self.tag,
2804                 expl=self._expl,
2805                 _specs=self.specs,
2806             )
2807             if self._value is None:
2808                 self._value = default
2809
2810     def _value_sanitize(self, value):
2811         if isinstance(value, integer_types):
2812             pass
2813         elif issubclass(value.__class__, Integer):
2814             value = value._value
2815         elif value.__class__ == str:
2816             value = self.specs.get(value)
2817             if value is None:
2818                 raise ObjUnknown("integer value: %s" % value)
2819         else:
2820             raise InvalidValueType((self.__class__, int, str))
2821         if not self._bound_min <= value <= self._bound_max:
2822             raise BoundsError(self._bound_min, value, self._bound_max)
2823         return value
2824
2825     @property
2826     def ready(self):
2827         return self._value is not None
2828
2829     def __getstate__(self):
2830         return IntegerState(
2831             __version__,
2832             self.tag,
2833             self._tag_order,
2834             self._expl,
2835             self.default,
2836             self.optional,
2837             self.offset,
2838             self.llen,
2839             self.vlen,
2840             self.expl_lenindef,
2841             self.lenindef,
2842             self.ber_encoded,
2843             self.specs,
2844             self._value,
2845             self._bound_min,
2846             self._bound_max,
2847         )
2848
2849     def __setstate__(self, state):
2850         super(Integer, self).__setstate__(state)
2851         self.specs = state.specs
2852         self._value = state.value
2853         self._bound_min = state.bound_min
2854         self._bound_max = state.bound_max
2855
2856     def __int__(self):
2857         self._assert_ready()
2858         return int(self._value)
2859
2860     def tohex(self):
2861         """Hexadecimal representation
2862
2863         Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2864         """
2865         hex_repr = hex(int(self))[2:].upper()
2866         if len(hex_repr) % 2 != 0:
2867             hex_repr = "0" + hex_repr
2868         return hex_repr
2869
2870     def __hash__(self):
2871         self._assert_ready()
2872         return hash(b"".join((
2873             self.tag,
2874             bytes(self._expl or b""),
2875             str(self._value).encode("ascii"),
2876         )))
2877
2878     def __eq__(self, their):
2879         if isinstance(their, integer_types):
2880             return self._value == their
2881         if not issubclass(their.__class__, Integer):
2882             return False
2883         return (
2884             self._value == their._value and
2885             self.tag == their.tag and
2886             self._expl == their._expl
2887         )
2888
2889     def __lt__(self, their):
2890         return self._value < their._value
2891
2892     @property
2893     def named(self):
2894         """Return named representation (if exists) of the value
2895         """
2896         for name, value in iteritems(self.specs):
2897             if value == self._value:
2898                 return name
2899         return None
2900
2901     def __call__(
2902             self,
2903             value=None,
2904             bounds=None,
2905             impl=None,
2906             expl=None,
2907             default=None,
2908             optional=None,
2909     ):
2910         return self.__class__(
2911             value=value,
2912             bounds=(
2913                 (self._bound_min, self._bound_max)
2914                 if bounds is None else bounds
2915             ),
2916             impl=self.tag if impl is None else impl,
2917             expl=self._expl if expl is None else expl,
2918             default=self.default if default is None else default,
2919             optional=self.optional if optional is None else optional,
2920             _specs=self.specs,
2921         )
2922
2923     def _encode_payload(self):
2924         self._assert_ready()
2925         value = self._value
2926         if PY2:
2927             if value == 0:
2928                 octets = bytearray([0])
2929             elif value < 0:
2930                 value = -value
2931                 value -= 1
2932                 octets = bytearray()
2933                 while value > 0:
2934                     octets.append((value & 0xFF) ^ 0xFF)
2935                     value >>= 8
2936                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2937                     octets.append(0xFF)
2938             else:
2939                 octets = bytearray()
2940                 while value > 0:
2941                     octets.append(value & 0xFF)
2942                     value >>= 8
2943                 if octets[-1] & 0x80 > 0:
2944                     octets.append(0x00)
2945             octets.reverse()
2946             octets = bytes(octets)
2947         else:
2948             bytes_len = ceil(value.bit_length() / 8) or 1
2949             while True:
2950                 try:
2951                     octets = value.to_bytes(
2952                         bytes_len,
2953                         byteorder="big",
2954                         signed=True,
2955                     )
2956                 except OverflowError:
2957                     bytes_len += 1
2958                 else:
2959                     break
2960         return octets
2961
2962     def _encode(self):
2963         octets = self._encode_payload()
2964         return b"".join((self.tag, len_encode(len(octets)), octets))
2965
2966     def _encode1st(self, state):
2967         l = len(self._encode_payload())
2968         return len(self.tag) + len_size(l) + l, state
2969
2970     def _encode2nd(self, writer, state_iter):
2971         write_full(writer, self._encode())
2972
2973     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2974         try:
2975             t, _, lv = tag_strip(tlv)
2976         except DecodeError as err:
2977             raise err.__class__(
2978                 msg=err.msg,
2979                 klass=self.__class__,
2980                 decode_path=decode_path,
2981                 offset=offset,
2982             )
2983         if t != self.tag:
2984             raise TagMismatch(
2985                 klass=self.__class__,
2986                 decode_path=decode_path,
2987                 offset=offset,
2988             )
2989         if tag_only:
2990             yield None
2991             return
2992         try:
2993             l, llen, v = len_decode(lv)
2994         except DecodeError as err:
2995             raise err.__class__(
2996                 msg=err.msg,
2997                 klass=self.__class__,
2998                 decode_path=decode_path,
2999                 offset=offset,
3000             )
3001         if l > len(v):
3002             raise NotEnoughData(
3003                 "encoded length is longer than data",
3004                 klass=self.__class__,
3005                 decode_path=decode_path,
3006                 offset=offset,
3007             )
3008         if l == 0:
3009             raise NotEnoughData(
3010                 "zero length",
3011                 klass=self.__class__,
3012                 decode_path=decode_path,
3013                 offset=offset,
3014             )
3015         v, tail = v[:l], v[l:]
3016         first_octet = byte2int(v)
3017         if l > 1:
3018             second_octet = byte2int(v[1:])
3019             if (
3020                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3021                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3022             ):
3023                 raise DecodeError(
3024                     "non normalized integer",
3025                     klass=self.__class__,
3026                     decode_path=decode_path,
3027                     offset=offset,
3028                 )
3029         if PY2:
3030             value = 0
3031             if first_octet & 0x80 > 0:
3032                 octets = bytearray()
3033                 for octet in bytearray(v):
3034                     octets.append(octet ^ 0xFF)
3035                 for octet in octets:
3036                     value = (value << 8) | octet
3037                 value += 1
3038                 value = -value
3039             else:
3040                 for octet in bytearray(v):
3041                     value = (value << 8) | octet
3042         else:
3043             value = int.from_bytes(v, byteorder="big", signed=True)
3044         try:
3045             obj = self.__class__(
3046                 value=value,
3047                 bounds=(self._bound_min, self._bound_max),
3048                 impl=self.tag,
3049                 expl=self._expl,
3050                 default=self.default,
3051                 optional=self.optional,
3052                 _specs=self.specs,
3053                 _decoded=(offset, llen, l),
3054             )
3055         except BoundsError as err:
3056             raise DecodeError(
3057                 msg=str(err),
3058                 klass=self.__class__,
3059                 decode_path=decode_path,
3060                 offset=offset,
3061             )
3062         yield decode_path, obj, tail
3063
3064     def __repr__(self):
3065         return pp_console_row(next(self.pps()))
3066
3067     def pps(self, decode_path=()):
3068         yield _pp(
3069             obj=self,
3070             asn1_type_name=self.asn1_type_name,
3071             obj_name=self.__class__.__name__,
3072             decode_path=decode_path,
3073             value=(self.named or str(self._value)) if self.ready else None,
3074             optional=self.optional,
3075             default=self == self.default,
3076             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3077             expl=None if self._expl is None else tag_decode(self._expl),
3078             offset=self.offset,
3079             tlen=self.tlen,
3080             llen=self.llen,
3081             vlen=self.vlen,
3082             expl_offset=self.expl_offset if self.expled else None,
3083             expl_tlen=self.expl_tlen if self.expled else None,
3084             expl_llen=self.expl_llen if self.expled else None,
3085             expl_vlen=self.expl_vlen if self.expled else None,
3086             expl_lenindef=self.expl_lenindef,
3087             bered=self.bered,
3088         )
3089         for pp in self.pps_lenindef(decode_path):
3090             yield pp
3091
3092
3093 BitStringState = namedtuple(
3094     "BitStringState",
3095     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3096     **NAMEDTUPLE_KWARGS
3097 )
3098
3099
3100 class BitString(Obj):
3101     """``BIT STRING`` bit string type
3102
3103     >>> BitString(b"hello world")
3104     BIT STRING 88 bits 68656c6c6f20776f726c64
3105     >>> bytes(b)
3106     b'hello world'
3107     >>> b == b"hello world"
3108     True
3109     >>> b.bit_len
3110     88
3111
3112     >>> BitString("'0A3B5F291CD'H")
3113     BIT STRING 44 bits 0a3b5f291cd0
3114     >>> b = BitString("'010110000000'B")
3115     BIT STRING 12 bits 5800
3116     >>> b.bit_len
3117     12
3118     >>> b[0], b[1], b[2], b[3]
3119     (False, True, False, True)
3120     >>> b[1000]
3121     False
3122     >>> [v for v in b]
3123     [False, True, False, True, True, False, False, False, False, False, False, False]
3124
3125     ::
3126
3127         class KeyUsage(BitString):
3128             schema = (
3129                 ("digitalSignature", 0),
3130                 ("nonRepudiation", 1),
3131                 ("keyEncipherment", 2),
3132             )
3133
3134     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3135     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3136     >>> b.named
3137     ['nonRepudiation', 'keyEncipherment']
3138     >>> b.specs
3139     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3140
3141     .. note::
3142
3143        Pay attention that BIT STRING can be encoded both in primitive
3144        and constructed forms. Decoder always checks constructed form tag
3145        additionally to specified primitive one. If BER decoding is
3146        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3147        of DER restrictions.
3148     """
3149     __slots__ = ("tag_constructed", "specs", "defined")
3150     tag_default = tag_encode(3)
3151     asn1_type_name = "BIT STRING"
3152
3153     def __init__(
3154             self,
3155             value=None,
3156             impl=None,
3157             expl=None,
3158             default=None,
3159             optional=False,
3160             _specs=None,
3161             _decoded=(0, 0, 0),
3162     ):
3163         """
3164         :param value: set the value. Either binary type, tuple of named
3165                       values (if ``schema`` is specified in the class),
3166                       string in ``'XXX...'B`` form, or
3167                       :py:class:`pyderasn.BitString` object
3168         :param bytes impl: override default tag with ``IMPLICIT`` one
3169         :param bytes expl: override default tag with ``EXPLICIT`` one
3170         :param default: set default value. Type same as in ``value``
3171         :param bool optional: is object ``OPTIONAL`` in sequence
3172         """
3173         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3174         specs = getattr(self, "schema", {}) if _specs is None else _specs
3175         self.specs = specs if specs.__class__ == dict else dict(specs)
3176         self._value = None if value is None else self._value_sanitize(value)
3177         if default is not None:
3178             default = self._value_sanitize(default)
3179             self.default = self.__class__(
3180                 value=default,
3181                 impl=self.tag,
3182                 expl=self._expl,
3183             )
3184             if value is None:
3185                 self._value = default
3186         self.defined = None
3187         tag_klass, _, tag_num = tag_decode(self.tag)
3188         self.tag_constructed = tag_encode(
3189             klass=tag_klass,
3190             form=TagFormConstructed,
3191             num=tag_num,
3192         )
3193
3194     def _bits2octets(self, bits):
3195         if len(self.specs) > 0:
3196             bits = bits.rstrip("0")
3197         bit_len = len(bits)
3198         bits += "0" * ((8 - (bit_len % 8)) % 8)
3199         octets = bytearray(len(bits) // 8)
3200         for i in six_xrange(len(octets)):
3201             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3202         return bit_len, bytes(octets)
3203
3204     def _value_sanitize(self, value):
3205         if isinstance(value, (string_types, binary_type)):
3206             if (
3207                     isinstance(value, string_types) and
3208                     value.startswith("'")
3209             ):
3210                 if value.endswith("'B"):
3211                     value = value[1:-2]
3212                     if not frozenset(value) <= SET01:
3213                         raise ValueError("B's coding contains unacceptable chars")
3214                     return self._bits2octets(value)
3215                 if value.endswith("'H"):
3216                     value = value[1:-2]
3217                     return (
3218                         len(value) * 4,
3219                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3220                     )
3221             if value.__class__ == binary_type:
3222                 return (len(value) * 8, value)
3223             raise InvalidValueType((self.__class__, string_types, binary_type))
3224         if value.__class__ == tuple:
3225             if (
3226                     len(value) == 2 and
3227                     isinstance(value[0], integer_types) and
3228                     value[1].__class__ == binary_type
3229             ):
3230                 return value
3231             bits = []
3232             for name in value:
3233                 bit = self.specs.get(name)
3234                 if bit is None:
3235                     raise ObjUnknown("BitString value: %s" % name)
3236                 bits.append(bit)
3237             if len(bits) == 0:
3238                 return self._bits2octets("")
3239             bits = frozenset(bits)
3240             return self._bits2octets("".join(
3241                 ("1" if bit in bits else "0")
3242                 for bit in six_xrange(max(bits) + 1)
3243             ))
3244         if issubclass(value.__class__, BitString):
3245             return value._value
3246         raise InvalidValueType((self.__class__, binary_type, string_types))
3247
3248     @property
3249     def ready(self):
3250         return self._value is not None
3251
3252     def __getstate__(self):
3253         return BitStringState(
3254             __version__,
3255             self.tag,
3256             self._tag_order,
3257             self._expl,
3258             self.default,
3259             self.optional,
3260             self.offset,
3261             self.llen,
3262             self.vlen,
3263             self.expl_lenindef,
3264             self.lenindef,
3265             self.ber_encoded,
3266             self.specs,
3267             self._value,
3268             self.tag_constructed,
3269             self.defined,
3270         )
3271
3272     def __setstate__(self, state):
3273         super(BitString, self).__setstate__(state)
3274         self.specs = state.specs
3275         self._value = state.value
3276         self.tag_constructed = state.tag_constructed
3277         self.defined = state.defined
3278
3279     def __iter__(self):
3280         self._assert_ready()
3281         for i in six_xrange(self._value[0]):
3282             yield self[i]
3283
3284     @property
3285     def bit_len(self):
3286         """Returns number of bits in the string
3287         """
3288         self._assert_ready()
3289         return self._value[0]
3290
3291     def __bytes__(self):
3292         self._assert_ready()
3293         return self._value[1]
3294
3295     def __eq__(self, their):
3296         if their.__class__ == bytes:
3297             return self._value[1] == their
3298         if not issubclass(their.__class__, BitString):
3299             return False
3300         return (
3301             self._value == their._value and
3302             self.tag == their.tag and
3303             self._expl == their._expl
3304         )
3305
3306     @property
3307     def named(self):
3308         """Named representation (if exists) of the bits
3309
3310         :returns: [str(name), ...]
3311         """
3312         return [name for name, bit in iteritems(self.specs) if self[bit]]
3313
3314     def __call__(
3315             self,
3316             value=None,
3317             impl=None,
3318             expl=None,
3319             default=None,
3320             optional=None,
3321     ):
3322         return self.__class__(
3323             value=value,
3324             impl=self.tag if impl is None else impl,
3325             expl=self._expl if expl is None else expl,
3326             default=self.default if default is None else default,
3327             optional=self.optional if optional is None else optional,
3328             _specs=self.specs,
3329         )
3330
3331     def __getitem__(self, key):
3332         if key.__class__ == int:
3333             bit_len, octets = self._value
3334             if key >= bit_len:
3335                 return False
3336             return (
3337                 byte2int(memoryview(octets)[key // 8:]) >>
3338                 (7 - (key % 8))
3339             ) & 1 == 1
3340         if isinstance(key, string_types):
3341             value = self.specs.get(key)
3342             if value is None:
3343                 raise ObjUnknown("BitString value: %s" % key)
3344             return self[value]
3345         raise InvalidValueType((int, str))
3346
3347     def _encode(self):
3348         self._assert_ready()
3349         bit_len, octets = self._value
3350         return b"".join((
3351             self.tag,
3352             len_encode(len(octets) + 1),
3353             int2byte((8 - bit_len % 8) % 8),
3354             octets,
3355         ))
3356
3357     def _encode1st(self, state):
3358         self._assert_ready()
3359         _, octets = self._value
3360         l = len(octets) + 1
3361         return len(self.tag) + len_size(l) + l, state
3362
3363     def _encode2nd(self, writer, state_iter):
3364         bit_len, octets = self._value
3365         write_full(writer, b"".join((
3366             self.tag,
3367             len_encode(len(octets) + 1),
3368             int2byte((8 - bit_len % 8) % 8),
3369         )))
3370         write_full(writer, octets)
3371
3372     def _encode_cer(self, writer):
3373         bit_len, octets = self._value
3374         if len(octets) + 1 <= 1000:
3375             write_full(writer, self._encode())
3376             return
3377         write_full(writer, self.tag_constructed)
3378         write_full(writer, LENINDEF)
3379         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3380             write_full(writer, b"".join((
3381                 BitString.tag_default,
3382                 LEN1K,
3383                 int2byte(0),
3384                 octets[offset:offset + 999],
3385             )))
3386         tail = octets[offset + 999:]
3387         if len(tail) > 0:
3388             tail = int2byte((8 - bit_len % 8) % 8) + tail
3389             write_full(writer, b"".join((
3390                 BitString.tag_default,
3391                 len_encode(len(tail)),
3392                 tail,
3393             )))
3394         write_full(writer, EOC)
3395
3396     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3397         try:
3398             t, tlen, lv = tag_strip(tlv)
3399         except DecodeError as err:
3400             raise err.__class__(
3401                 msg=err.msg,
3402                 klass=self.__class__,
3403                 decode_path=decode_path,
3404                 offset=offset,
3405             )
3406         if t == self.tag:
3407             if tag_only:  # pragma: no cover
3408                 yield None
3409                 return
3410             try:
3411                 l, llen, v = len_decode(lv)
3412             except DecodeError as err:
3413                 raise err.__class__(
3414                     msg=err.msg,
3415                     klass=self.__class__,
3416                     decode_path=decode_path,
3417                     offset=offset,
3418                 )
3419             if l > len(v):
3420                 raise NotEnoughData(
3421                     "encoded length is longer than data",
3422                     klass=self.__class__,
3423                     decode_path=decode_path,
3424                     offset=offset,
3425                 )
3426             if l == 0:
3427                 raise NotEnoughData(
3428                     "zero length",
3429                     klass=self.__class__,
3430                     decode_path=decode_path,
3431                     offset=offset,
3432                 )
3433             pad_size = byte2int(v)
3434             if l == 1 and pad_size != 0:
3435                 raise DecodeError(
3436                     "invalid empty value",
3437                     klass=self.__class__,
3438                     decode_path=decode_path,
3439                     offset=offset,
3440                 )
3441             if pad_size > 7:
3442                 raise DecodeError(
3443                     "too big pad",
3444                     klass=self.__class__,
3445                     decode_path=decode_path,
3446                     offset=offset,
3447                 )
3448             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3449                 raise DecodeError(
3450                     "invalid pad",
3451                     klass=self.__class__,
3452                     decode_path=decode_path,
3453                     offset=offset,
3454                 )
3455             v, tail = v[:l], v[l:]
3456             bit_len = (len(v) - 1) * 8 - pad_size
3457             obj = self.__class__(
3458                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3459                 impl=self.tag,
3460                 expl=self._expl,
3461                 default=self.default,
3462                 optional=self.optional,
3463                 _specs=self.specs,
3464                 _decoded=(offset, llen, l),
3465             )
3466             if evgen_mode:
3467                 obj._value = (bit_len, None)
3468             yield decode_path, obj, tail
3469             return
3470         if t != self.tag_constructed:
3471             raise TagMismatch(
3472                 klass=self.__class__,
3473                 decode_path=decode_path,
3474                 offset=offset,
3475             )
3476         if not ctx.get("bered", False):
3477             raise DecodeError(
3478                 "unallowed BER constructed encoding",
3479                 klass=self.__class__,
3480                 decode_path=decode_path,
3481                 offset=offset,
3482             )
3483         if tag_only:  # pragma: no cover
3484             yield None
3485             return
3486         lenindef = False
3487         try:
3488             l, llen, v = len_decode(lv)
3489         except LenIndefForm:
3490             llen, l, v = 1, 0, lv[1:]
3491             lenindef = True
3492         except DecodeError as err:
3493             raise err.__class__(
3494                 msg=err.msg,
3495                 klass=self.__class__,
3496                 decode_path=decode_path,
3497                 offset=offset,
3498             )
3499         if l > len(v):
3500             raise NotEnoughData(
3501                 "encoded length is longer than data",
3502                 klass=self.__class__,
3503                 decode_path=decode_path,
3504                 offset=offset,
3505             )
3506         if not lenindef and l == 0:
3507             raise NotEnoughData(
3508                 "zero length",
3509                 klass=self.__class__,
3510                 decode_path=decode_path,
3511                 offset=offset,
3512             )
3513         chunks = []
3514         sub_offset = offset + tlen + llen
3515         vlen = 0
3516         while True:
3517             if lenindef:
3518                 if v[:EOC_LEN].tobytes() == EOC:
3519                     break
3520             else:
3521                 if vlen == l:
3522                     break
3523                 if vlen > l:
3524                     raise DecodeError(
3525                         "chunk out of bounds",
3526                         klass=self.__class__,
3527                         decode_path=decode_path + (str(len(chunks) - 1),),
3528                         offset=chunks[-1].offset,
3529                     )
3530             sub_decode_path = decode_path + (str(len(chunks)),)
3531             try:
3532                 if evgen_mode:
3533                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3534                             v,
3535                             offset=sub_offset,
3536                             decode_path=sub_decode_path,
3537                             leavemm=True,
3538                             ctx=ctx,
3539                             _ctx_immutable=False,
3540                     ):
3541                         yield _decode_path, chunk, v_tail
3542                 else:
3543                     _, chunk, v_tail = next(BitString().decode_evgen(
3544                         v,
3545                         offset=sub_offset,
3546                         decode_path=sub_decode_path,
3547                         leavemm=True,
3548                         ctx=ctx,
3549                         _ctx_immutable=False,
3550                         _evgen_mode=False,
3551                     ))
3552             except TagMismatch:
3553                 raise DecodeError(
3554                     "expected BitString encoded chunk",
3555                     klass=self.__class__,
3556                     decode_path=sub_decode_path,
3557                     offset=sub_offset,
3558                 )
3559             chunks.append(chunk)
3560             sub_offset += chunk.tlvlen
3561             vlen += chunk.tlvlen
3562             v = v_tail
3563         if len(chunks) == 0:
3564             raise DecodeError(
3565                 "no chunks",
3566                 klass=self.__class__,
3567                 decode_path=decode_path,
3568                 offset=offset,
3569             )
3570         values = []
3571         bit_len = 0
3572         for chunk_i, chunk in enumerate(chunks[:-1]):
3573             if chunk.bit_len % 8 != 0:
3574                 raise DecodeError(
3575                     "BitString chunk is not multiple of 8 bits",
3576                     klass=self.__class__,
3577                     decode_path=decode_path + (str(chunk_i),),
3578                     offset=chunk.offset,
3579                 )
3580             if not evgen_mode:
3581                 values.append(bytes(chunk))
3582             bit_len += chunk.bit_len
3583         chunk_last = chunks[-1]
3584         if not evgen_mode:
3585             values.append(bytes(chunk_last))
3586         bit_len += chunk_last.bit_len
3587         obj = self.__class__(
3588             value=None if evgen_mode else (bit_len, b"".join(values)),
3589             impl=self.tag,
3590             expl=self._expl,
3591             default=self.default,
3592             optional=self.optional,
3593             _specs=self.specs,
3594             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3595         )
3596         if evgen_mode:
3597             obj._value = (bit_len, None)
3598         obj.lenindef = lenindef
3599         obj.ber_encoded = True
3600         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3601
3602     def __repr__(self):
3603         return pp_console_row(next(self.pps()))
3604
3605     def pps(self, decode_path=()):
3606         value = None
3607         blob = None
3608         if self.ready:
3609             bit_len, blob = self._value
3610             value = "%d bits" % bit_len
3611             if len(self.specs) > 0 and blob is not None:
3612                 blob = tuple(self.named)
3613         yield _pp(
3614             obj=self,
3615             asn1_type_name=self.asn1_type_name,
3616             obj_name=self.__class__.__name__,
3617             decode_path=decode_path,
3618             value=value,
3619             blob=blob,
3620             optional=self.optional,
3621             default=self == self.default,
3622             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3623             expl=None if self._expl is None else tag_decode(self._expl),
3624             offset=self.offset,
3625             tlen=self.tlen,
3626             llen=self.llen,
3627             vlen=self.vlen,
3628             expl_offset=self.expl_offset if self.expled else None,
3629             expl_tlen=self.expl_tlen if self.expled else None,
3630             expl_llen=self.expl_llen if self.expled else None,
3631             expl_vlen=self.expl_vlen if self.expled else None,
3632             expl_lenindef=self.expl_lenindef,
3633             lenindef=self.lenindef,
3634             ber_encoded=self.ber_encoded,
3635             bered=self.bered,
3636         )
3637         defined_by, defined = self.defined or (None, None)
3638         if defined_by is not None:
3639             yield defined.pps(
3640                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3641             )
3642         for pp in self.pps_lenindef(decode_path):
3643             yield pp
3644
3645
3646 OctetStringState = namedtuple(
3647     "OctetStringState",
3648     BasicState._fields + (
3649         "value",
3650         "bound_min",
3651         "bound_max",
3652         "tag_constructed",
3653         "defined",
3654     ),
3655     **NAMEDTUPLE_KWARGS
3656 )
3657
3658
3659 class OctetString(Obj):
3660     """``OCTET STRING`` binary string type
3661
3662     >>> s = OctetString(b"hello world")
3663     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3664     >>> s == OctetString(b"hello world")
3665     True
3666     >>> bytes(s)
3667     b'hello world'
3668
3669     >>> OctetString(b"hello", bounds=(4, 4))
3670     Traceback (most recent call last):
3671     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3672     >>> OctetString(b"hell", bounds=(4, 4))
3673     OCTET STRING 4 bytes 68656c6c
3674
3675     Memoryviews can be used as a values. If memoryview is made on
3676     mmap-ed file, then it does not take storage inside OctetString
3677     itself. In CER encoding mode it will be streamed to the specified
3678     writer, copying 1 KB chunks.
3679     """
3680     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3681     tag_default = tag_encode(4)
3682     asn1_type_name = "OCTET STRING"
3683     evgen_mode_skip_value = True
3684
3685     def __init__(
3686             self,
3687             value=None,
3688             bounds=None,
3689             impl=None,
3690             expl=None,
3691             default=None,
3692             optional=False,
3693             _decoded=(0, 0, 0),
3694             ctx=None,
3695     ):
3696         """
3697         :param value: set the value. Either binary type, or
3698                       :py:class:`pyderasn.OctetString` object
3699         :param bounds: set ``(MIN, MAX)`` value size constraint.
3700                        (-inf, +inf) by default
3701         :param bytes impl: override default tag with ``IMPLICIT`` one
3702         :param bytes expl: override default tag with ``EXPLICIT`` one
3703         :param default: set default value. Type same as in ``value``
3704         :param bool optional: is object ``OPTIONAL`` in sequence
3705         """
3706         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3707         self._value = value
3708         self._bound_min, self._bound_max = getattr(
3709             self,
3710             "bounds",
3711             (0, float("+inf")),
3712         ) if bounds is None else bounds
3713         if value is not None:
3714             self._value = self._value_sanitize(value)
3715         if default is not None:
3716             default = self._value_sanitize(default)
3717             self.default = self.__class__(
3718                 value=default,
3719                 impl=self.tag,
3720                 expl=self._expl,
3721             )
3722             if self._value is None:
3723                 self._value = default
3724         self.defined = None
3725         tag_klass, _, tag_num = tag_decode(self.tag)
3726         self.tag_constructed = tag_encode(
3727             klass=tag_klass,
3728             form=TagFormConstructed,
3729             num=tag_num,
3730         )
3731
3732     def _value_sanitize(self, value):
3733         if value.__class__ == binary_type or value.__class__ == memoryview:
3734             pass
3735         elif issubclass(value.__class__, OctetString):
3736             value = value._value
3737         else:
3738             raise InvalidValueType((self.__class__, bytes, memoryview))
3739         if not self._bound_min <= len(value) <= self._bound_max:
3740             raise BoundsError(self._bound_min, len(value), self._bound_max)
3741         return value
3742
3743     @property
3744     def ready(self):
3745         return self._value is not None
3746
3747     def __getstate__(self):
3748         return OctetStringState(
3749             __version__,
3750             self.tag,
3751             self._tag_order,
3752             self._expl,
3753             self.default,
3754             self.optional,
3755             self.offset,
3756             self.llen,
3757             self.vlen,
3758             self.expl_lenindef,
3759             self.lenindef,
3760             self.ber_encoded,
3761             self._value,
3762             self._bound_min,
3763             self._bound_max,
3764             self.tag_constructed,
3765             self.defined,
3766         )
3767
3768     def __setstate__(self, state):
3769         super(OctetString, self).__setstate__(state)
3770         self._value = state.value
3771         self._bound_min = state.bound_min
3772         self._bound_max = state.bound_max
3773         self.tag_constructed = state.tag_constructed
3774         self.defined = state.defined
3775
3776     def __bytes__(self):
3777         self._assert_ready()
3778         return bytes(self._value)
3779
3780     def __eq__(self, their):
3781         if their.__class__ == binary_type:
3782             return self._value == their
3783         if not issubclass(their.__class__, OctetString):
3784             return False
3785         return (
3786             self._value == their._value and
3787             self.tag == their.tag and
3788             self._expl == their._expl
3789         )
3790
3791     def __lt__(self, their):
3792         return self._value < their._value
3793
3794     def __call__(
3795             self,
3796             value=None,
3797             bounds=None,
3798             impl=None,
3799             expl=None,
3800             default=None,
3801             optional=None,
3802     ):
3803         return self.__class__(
3804             value=value,
3805             bounds=(
3806                 (self._bound_min, self._bound_max)
3807                 if bounds is None else bounds
3808             ),
3809             impl=self.tag if impl is None else impl,
3810             expl=self._expl if expl is None else expl,
3811             default=self.default if default is None else default,
3812             optional=self.optional if optional is None else optional,
3813         )
3814
3815     def _encode(self):
3816         self._assert_ready()
3817         return b"".join((
3818             self.tag,
3819             len_encode(len(self._value)),
3820             self._value,
3821         ))
3822
3823     def _encode1st(self, state):
3824         self._assert_ready()
3825         l = len(self._value)
3826         return len(self.tag) + len_size(l) + l, state
3827
3828     def _encode2nd(self, writer, state_iter):
3829         value = self._value
3830         write_full(writer, self.tag + len_encode(len(value)))
3831         write_full(writer, value)
3832
3833     def _encode_cer(self, writer):
3834         octets = self._value
3835         if len(octets) <= 1000:
3836             write_full(writer, self._encode())
3837             return
3838         write_full(writer, self.tag_constructed)
3839         write_full(writer, LENINDEF)
3840         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3841             write_full(writer, b"".join((
3842                 OctetString.tag_default,
3843                 LEN1K,
3844                 octets[offset:offset + 1000],
3845             )))
3846         tail = octets[offset + 1000:]
3847         if len(tail) > 0:
3848             write_full(writer, b"".join((
3849                 OctetString.tag_default,
3850                 len_encode(len(tail)),
3851                 tail,
3852             )))
3853         write_full(writer, EOC)
3854
3855     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3856         try:
3857             t, tlen, lv = tag_strip(tlv)
3858         except DecodeError as err:
3859             raise err.__class__(
3860                 msg=err.msg,
3861                 klass=self.__class__,
3862                 decode_path=decode_path,
3863                 offset=offset,
3864             )
3865         if t == self.tag:
3866             if tag_only:
3867                 yield None
3868                 return
3869             try:
3870                 l, llen, v = len_decode(lv)
3871             except DecodeError as err:
3872                 raise err.__class__(
3873                     msg=err.msg,
3874                     klass=self.__class__,
3875                     decode_path=decode_path,
3876                     offset=offset,
3877                 )
3878             if l > len(v):
3879                 raise NotEnoughData(
3880                     "encoded length is longer than data",
3881                     klass=self.__class__,
3882                     decode_path=decode_path,
3883                     offset=offset,
3884                 )
3885             v, tail = v[:l], v[l:]
3886             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3887                 raise DecodeError(
3888                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3889                     klass=self.__class__,
3890                     decode_path=decode_path,
3891                     offset=offset,
3892                 )
3893             try:
3894                 obj = self.__class__(
3895                     value=(
3896                         None if (evgen_mode and self.evgen_mode_skip_value)
3897                         else v.tobytes()
3898                     ),
3899                     bounds=(self._bound_min, self._bound_max),
3900                     impl=self.tag,
3901                     expl=self._expl,
3902                     default=self.default,
3903                     optional=self.optional,
3904                     _decoded=(offset, llen, l),
3905                     ctx=ctx,
3906                 )
3907             except DecodeError as err:
3908                 raise DecodeError(
3909                     msg=err.msg,
3910                     klass=self.__class__,
3911                     decode_path=decode_path,
3912                     offset=offset,
3913                 )
3914             except BoundsError as err:
3915                 raise DecodeError(
3916                     msg=str(err),
3917                     klass=self.__class__,
3918                     decode_path=decode_path,
3919                     offset=offset,
3920                 )
3921             yield decode_path, obj, tail
3922             return
3923         if t != self.tag_constructed:
3924             raise TagMismatch(
3925                 klass=self.__class__,
3926                 decode_path=decode_path,
3927                 offset=offset,
3928             )
3929         if not ctx.get("bered", False):
3930             raise DecodeError(
3931                 "unallowed BER constructed encoding",
3932                 klass=self.__class__,
3933                 decode_path=decode_path,
3934                 offset=offset,
3935             )
3936         if tag_only:
3937             yield None
3938             return
3939         lenindef = False
3940         try:
3941             l, llen, v = len_decode(lv)
3942         except LenIndefForm:
3943             llen, l, v = 1, 0, lv[1:]
3944             lenindef = True
3945         except DecodeError as err:
3946             raise err.__class__(
3947                 msg=err.msg,
3948                 klass=self.__class__,
3949                 decode_path=decode_path,
3950                 offset=offset,
3951             )
3952         if l > len(v):
3953             raise NotEnoughData(
3954                 "encoded length is longer than data",
3955                 klass=self.__class__,
3956                 decode_path=decode_path,
3957                 offset=offset,
3958             )
3959         chunks = []
3960         chunks_count = 0
3961         sub_offset = offset + tlen + llen
3962         vlen = 0
3963         payload_len = 0
3964         while True:
3965             if lenindef:
3966                 if v[:EOC_LEN].tobytes() == EOC:
3967                     break
3968             else:
3969                 if vlen == l:
3970                     break
3971                 if vlen > l:
3972                     raise DecodeError(
3973                         "chunk out of bounds",
3974                         klass=self.__class__,
3975                         decode_path=decode_path + (str(len(chunks) - 1),),
3976                         offset=chunks[-1].offset,
3977                     )
3978             try:
3979                 if evgen_mode:
3980                     sub_decode_path = decode_path + (str(chunks_count),)
3981                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3982                             v,
3983                             offset=sub_offset,
3984                             decode_path=sub_decode_path,
3985                             leavemm=True,
3986                             ctx=ctx,
3987                             _ctx_immutable=False,
3988                     ):
3989                         yield _decode_path, chunk, v_tail
3990                         if not chunk.ber_encoded:
3991                             payload_len += chunk.vlen
3992                     chunks_count += 1
3993                 else:
3994                     sub_decode_path = decode_path + (str(len(chunks)),)
3995                     _, chunk, v_tail = next(OctetString().decode_evgen(
3996                         v,
3997                         offset=sub_offset,
3998                         decode_path=sub_decode_path,
3999                         leavemm=True,
4000                         ctx=ctx,
4001                         _ctx_immutable=False,
4002                         _evgen_mode=False,
4003                     ))
4004                     chunks.append(chunk)
4005             except TagMismatch:
4006                 raise DecodeError(
4007                     "expected OctetString encoded chunk",
4008                     klass=self.__class__,
4009                     decode_path=sub_decode_path,
4010                     offset=sub_offset,
4011                 )
4012             sub_offset += chunk.tlvlen
4013             vlen += chunk.tlvlen
4014             v = v_tail
4015         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4016             raise DecodeError(
4017                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4018                 klass=self.__class__,
4019                 decode_path=decode_path,
4020                 offset=offset,
4021             )
4022         try:
4023             obj = self.__class__(
4024                 value=(
4025                     None if evgen_mode else
4026                     b"".join(bytes(chunk) for chunk in chunks)
4027                 ),
4028                 bounds=(self._bound_min, self._bound_max),
4029                 impl=self.tag,
4030                 expl=self._expl,
4031                 default=self.default,
4032                 optional=self.optional,
4033                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4034                 ctx=ctx,
4035             )
4036         except DecodeError as err:
4037             raise DecodeError(
4038                 msg=err.msg,
4039                 klass=self.__class__,
4040                 decode_path=decode_path,
4041                 offset=offset,
4042             )
4043         except BoundsError as err:
4044             raise DecodeError(
4045                 msg=str(err),
4046                 klass=self.__class__,
4047                 decode_path=decode_path,
4048                 offset=offset,
4049             )
4050         obj.lenindef = lenindef
4051         obj.ber_encoded = True
4052         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4053
4054     def __repr__(self):
4055         return pp_console_row(next(self.pps()))
4056
4057     def pps(self, decode_path=()):
4058         yield _pp(
4059             obj=self,
4060             asn1_type_name=self.asn1_type_name,
4061             obj_name=self.__class__.__name__,
4062             decode_path=decode_path,
4063             value=("%d bytes" % len(self._value)) if self.ready else None,
4064             blob=self._value if self.ready else None,
4065             optional=self.optional,
4066             default=self == self.default,
4067             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4068             expl=None if self._expl is None else tag_decode(self._expl),
4069             offset=self.offset,
4070             tlen=self.tlen,
4071             llen=self.llen,
4072             vlen=self.vlen,
4073             expl_offset=self.expl_offset if self.expled else None,
4074             expl_tlen=self.expl_tlen if self.expled else None,
4075             expl_llen=self.expl_llen if self.expled else None,
4076             expl_vlen=self.expl_vlen if self.expled else None,
4077             expl_lenindef=self.expl_lenindef,
4078             lenindef=self.lenindef,
4079             ber_encoded=self.ber_encoded,
4080             bered=self.bered,
4081         )
4082         defined_by, defined = self.defined or (None, None)
4083         if defined_by is not None:
4084             yield defined.pps(
4085                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4086             )
4087         for pp in self.pps_lenindef(decode_path):
4088             yield pp
4089
4090
4091 def agg_octet_string(evgens, decode_path, raw, writer):
4092     """Aggregate constructed string (OctetString and its derivatives)
4093
4094     :param evgens: iterator of generated events
4095     :param decode_path: points to the string we want to decode
4096     :param raw: slicebable (memoryview, bytearray, etc) with
4097                 the data evgens are generated on
4098     :param writer: buffer.write where string is going to be saved
4099     :param writer: where string is going to be saved. Must comply
4100                    with ``io.RawIOBase.write`` behaviour
4101
4102     .. seealso:: :ref:`agg_octet_string`
4103     """
4104     decode_path_len = len(decode_path)
4105     for dp, obj, _ in evgens:
4106         if dp[:decode_path_len] != decode_path:
4107             continue
4108         if not obj.ber_encoded:
4109             write_full(writer, raw[
4110                 obj.offset + obj.tlen + obj.llen:
4111                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4112                 (EOC_LEN if obj.expl_lenindef else 0)
4113             ])
4114         if len(dp) == decode_path_len:
4115             break
4116
4117
4118 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4119
4120
4121 class Null(Obj):
4122     """``NULL`` null object
4123
4124     >>> n = Null()
4125     NULL
4126     >>> n.ready
4127     True
4128     """
4129     __slots__ = ()
4130     tag_default = tag_encode(5)
4131     asn1_type_name = "NULL"
4132
4133     def __init__(
4134             self,
4135             value=None,  # unused, but Sequence passes it
4136             impl=None,
4137             expl=None,
4138             optional=False,
4139             _decoded=(0, 0, 0),
4140     ):
4141         """
4142         :param bytes impl: override default tag with ``IMPLICIT`` one
4143         :param bytes expl: override default tag with ``EXPLICIT`` one
4144         :param bool optional: is object ``OPTIONAL`` in sequence
4145         """
4146         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4147         self.default = None
4148
4149     @property
4150     def ready(self):
4151         return True
4152
4153     def __getstate__(self):
4154         return NullState(
4155             __version__,
4156             self.tag,
4157             self._tag_order,
4158             self._expl,
4159             self.default,
4160             self.optional,
4161             self.offset,
4162             self.llen,
4163             self.vlen,
4164             self.expl_lenindef,
4165             self.lenindef,
4166             self.ber_encoded,
4167         )
4168
4169     def __eq__(self, their):
4170         if not issubclass(their.__class__, Null):
4171             return False
4172         return (
4173             self.tag == their.tag and
4174             self._expl == their._expl
4175         )
4176
4177     def __call__(
4178             self,
4179             value=None,
4180             impl=None,
4181             expl=None,
4182             optional=None,
4183     ):
4184         return self.__class__(
4185             impl=self.tag if impl is None else impl,
4186             expl=self._expl if expl is None else expl,
4187             optional=self.optional if optional is None else optional,
4188         )
4189
4190     def _encode(self):
4191         return self.tag + LEN0
4192
4193     def _encode1st(self, state):
4194         return len(self.tag) + 1, state
4195
4196     def _encode2nd(self, writer, state_iter):
4197         write_full(writer, self.tag + LEN0)
4198
4199     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4200         try:
4201             t, _, lv = tag_strip(tlv)
4202         except DecodeError as err:
4203             raise err.__class__(
4204                 msg=err.msg,
4205                 klass=self.__class__,
4206                 decode_path=decode_path,
4207                 offset=offset,
4208             )
4209         if t != self.tag:
4210             raise TagMismatch(
4211                 klass=self.__class__,
4212                 decode_path=decode_path,
4213                 offset=offset,
4214             )
4215         if tag_only:  # pragma: no cover
4216             yield None
4217             return
4218         try:
4219             l, _, v = len_decode(lv)
4220         except DecodeError as err:
4221             raise err.__class__(
4222                 msg=err.msg,
4223                 klass=self.__class__,
4224                 decode_path=decode_path,
4225                 offset=offset,
4226             )
4227         if l != 0:
4228             raise InvalidLength(
4229                 "Null must have zero length",
4230                 klass=self.__class__,
4231                 decode_path=decode_path,
4232                 offset=offset,
4233             )
4234         obj = self.__class__(
4235             impl=self.tag,
4236             expl=self._expl,
4237             optional=self.optional,
4238             _decoded=(offset, 1, 0),
4239         )
4240         yield decode_path, obj, v
4241
4242     def __repr__(self):
4243         return pp_console_row(next(self.pps()))
4244
4245     def pps(self, decode_path=()):
4246         yield _pp(
4247             obj=self,
4248             asn1_type_name=self.asn1_type_name,
4249             obj_name=self.__class__.__name__,
4250             decode_path=decode_path,
4251             optional=self.optional,
4252             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4253             expl=None if self._expl is None else tag_decode(self._expl),
4254             offset=self.offset,
4255             tlen=self.tlen,
4256             llen=self.llen,
4257             vlen=self.vlen,
4258             expl_offset=self.expl_offset if self.expled else None,
4259             expl_tlen=self.expl_tlen if self.expled else None,
4260             expl_llen=self.expl_llen if self.expled else None,
4261             expl_vlen=self.expl_vlen if self.expled else None,
4262             expl_lenindef=self.expl_lenindef,
4263             bered=self.bered,
4264         )
4265         for pp in self.pps_lenindef(decode_path):
4266             yield pp
4267
4268
4269 ObjectIdentifierState = namedtuple(
4270     "ObjectIdentifierState",
4271     BasicState._fields + ("value", "defines"),
4272     **NAMEDTUPLE_KWARGS
4273 )
4274
4275
4276 class ObjectIdentifier(Obj):
4277     """``OBJECT IDENTIFIER`` OID type
4278
4279     >>> oid = ObjectIdentifier((1, 2, 3))
4280     OBJECT IDENTIFIER 1.2.3
4281     >>> oid == ObjectIdentifier("1.2.3")
4282     True
4283     >>> tuple(oid)
4284     (1, 2, 3)
4285     >>> str(oid)
4286     '1.2.3'
4287     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4288     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4289
4290     >>> str(ObjectIdentifier((3, 1)))
4291     Traceback (most recent call last):
4292     pyderasn.InvalidOID: unacceptable first arc value
4293     """
4294     __slots__ = ("defines",)
4295     tag_default = tag_encode(6)
4296     asn1_type_name = "OBJECT IDENTIFIER"
4297
4298     def __init__(
4299             self,
4300             value=None,
4301             defines=(),
4302             impl=None,
4303             expl=None,
4304             default=None,
4305             optional=False,
4306             _decoded=(0, 0, 0),
4307     ):
4308         """
4309         :param value: set the value. Either tuples of integers,
4310                       string of "."-concatenated integers, or
4311                       :py:class:`pyderasn.ObjectIdentifier` object
4312         :param defines: sequence of tuples. Each tuple has two elements.
4313                         First one is relative to current one decode
4314                         path, aiming to the field defined by that OID.
4315                         Read about relative path in
4316                         :py:func:`pyderasn.abs_decode_path`. Second
4317                         tuple element is ``{OID: pyderasn.Obj()}``
4318                         dictionary, mapping between current OID value
4319                         and structure applied to defined field.
4320
4321                         .. seealso:: :ref:`definedby`
4322
4323         :param bytes impl: override default tag with ``IMPLICIT`` one
4324         :param bytes expl: override default tag with ``EXPLICIT`` one
4325         :param default: set default value. Type same as in ``value``
4326         :param bool optional: is object ``OPTIONAL`` in sequence
4327         """
4328         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4329         self._value = value
4330         if value is not None:
4331             self._value = self._value_sanitize(value)
4332         if default is not None:
4333             default = self._value_sanitize(default)
4334             self.default = self.__class__(
4335                 value=default,
4336                 impl=self.tag,
4337                 expl=self._expl,
4338             )
4339             if self._value is None:
4340                 self._value = default
4341         self.defines = defines
4342
4343     def __add__(self, their):
4344         if their.__class__ == tuple:
4345             return self.__class__(self._value + array("L", their))
4346         if isinstance(their, self.__class__):
4347             return self.__class__(self._value + their._value)
4348         raise InvalidValueType((self.__class__, tuple))
4349
4350     def _value_sanitize(self, value):
4351         if issubclass(value.__class__, ObjectIdentifier):
4352             return value._value
4353         if isinstance(value, string_types):
4354             try:
4355                 value = array("L", (pureint(arc) for arc in value.split(".")))
4356             except ValueError:
4357                 raise InvalidOID("unacceptable arcs values")
4358         if value.__class__ == tuple:
4359             try:
4360                 value = array("L", value)
4361             except OverflowError as err:
4362                 raise InvalidOID(repr(err))
4363         if value.__class__ is array:
4364             if len(value) < 2:
4365                 raise InvalidOID("less than 2 arcs")
4366             first_arc = value[0]
4367             if first_arc in (0, 1):
4368                 if not (0 <= value[1] <= 39):
4369                     raise InvalidOID("second arc is too wide")
4370             elif first_arc == 2:
4371                 pass
4372             else:
4373                 raise InvalidOID("unacceptable first arc value")
4374             if not all(arc >= 0 for arc in value):
4375                 raise InvalidOID("negative arc value")
4376             return value
4377         raise InvalidValueType((self.__class__, str, tuple))
4378
4379     @property
4380     def ready(self):
4381         return self._value is not None
4382
4383     def __getstate__(self):
4384         return ObjectIdentifierState(
4385             __version__,
4386             self.tag,
4387             self._tag_order,
4388             self._expl,
4389             self.default,
4390             self.optional,
4391             self.offset,
4392             self.llen,
4393             self.vlen,
4394             self.expl_lenindef,
4395             self.lenindef,
4396             self.ber_encoded,
4397             self._value,
4398             self.defines,
4399         )
4400
4401     def __setstate__(self, state):
4402         super(ObjectIdentifier, self).__setstate__(state)
4403         self._value = state.value
4404         self.defines = state.defines
4405
4406     def __iter__(self):
4407         self._assert_ready()
4408         return iter(self._value)
4409
4410     def __str__(self):
4411         return ".".join(str(arc) for arc in self._value or ())
4412
4413     def __hash__(self):
4414         self._assert_ready()
4415         return hash(b"".join((
4416             self.tag,
4417             bytes(self._expl or b""),
4418             str(self._value).encode("ascii"),
4419         )))
4420
4421     def __eq__(self, their):
4422         if their.__class__ == tuple:
4423             return self._value == array("L", their)
4424         if not issubclass(their.__class__, ObjectIdentifier):
4425             return False
4426         return (
4427             self.tag == their.tag and
4428             self._expl == their._expl and
4429             self._value == their._value
4430         )
4431
4432     def __lt__(self, their):
4433         return self._value < their._value
4434
4435     def __call__(
4436             self,
4437             value=None,
4438             defines=None,
4439             impl=None,
4440             expl=None,
4441             default=None,
4442             optional=None,
4443     ):
4444         return self.__class__(
4445             value=value,
4446             defines=self.defines if defines is None else defines,
4447             impl=self.tag if impl is None else impl,
4448             expl=self._expl if expl is None else expl,
4449             default=self.default if default is None else default,
4450             optional=self.optional if optional is None else optional,
4451         )
4452
4453     def _encode_octets(self):
4454         self._assert_ready()
4455         value = self._value
4456         first_value = value[1]
4457         first_arc = value[0]
4458         if first_arc == 0:
4459             pass
4460         elif first_arc == 1:
4461             first_value += 40
4462         elif first_arc == 2:
4463             first_value += 80
4464         else:  # pragma: no cover
4465             raise RuntimeError("invalid arc is stored")
4466         octets = [zero_ended_encode(first_value)]
4467         for arc in value[2:]:
4468             octets.append(zero_ended_encode(arc))
4469         return b"".join(octets)
4470
4471     def _encode(self):
4472         v = self._encode_octets()
4473         return b"".join((self.tag, len_encode(len(v)), v))
4474
4475     def _encode1st(self, state):
4476         l = len(self._encode_octets())
4477         return len(self.tag) + len_size(l) + l, state
4478
4479     def _encode2nd(self, writer, state_iter):
4480         write_full(writer, self._encode())
4481
4482     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4483         try:
4484             t, _, lv = tag_strip(tlv)
4485         except DecodeError as err:
4486             raise err.__class__(
4487                 msg=err.msg,
4488                 klass=self.__class__,
4489                 decode_path=decode_path,
4490                 offset=offset,
4491             )
4492         if t != self.tag:
4493             raise TagMismatch(
4494                 klass=self.__class__,
4495                 decode_path=decode_path,
4496                 offset=offset,
4497             )
4498         if tag_only:  # pragma: no cover
4499             yield None
4500             return
4501         try:
4502             l, llen, v = len_decode(lv)
4503         except DecodeError as err:
4504             raise err.__class__(
4505                 msg=err.msg,
4506                 klass=self.__class__,
4507                 decode_path=decode_path,
4508                 offset=offset,
4509             )
4510         if l > len(v):
4511             raise NotEnoughData(
4512                 "encoded length is longer than data",
4513                 klass=self.__class__,
4514                 decode_path=decode_path,
4515                 offset=offset,
4516             )
4517         if l == 0:
4518             raise NotEnoughData(
4519                 "zero length",
4520                 klass=self.__class__,
4521                 decode_path=decode_path,
4522                 offset=offset,
4523             )
4524         v, tail = v[:l], v[l:]
4525         arcs = array("L")
4526         ber_encoded = False
4527         while len(v) > 0:
4528             i = 0
4529             arc = 0
4530             while True:
4531                 octet = indexbytes(v, i)
4532                 if i == 0 and octet == 0x80:
4533                     if ctx.get("bered", False):
4534                         ber_encoded = True
4535                     else:
4536                         raise DecodeError(
4537                             "non normalized arc encoding",
4538                             klass=self.__class__,
4539                             decode_path=decode_path,
4540                             offset=offset,
4541                         )
4542                 arc = (arc << 7) | (octet & 0x7F)
4543                 if octet & 0x80 == 0:
4544                     try:
4545                         arcs.append(arc)
4546                     except OverflowError:
4547                         raise DecodeError(
4548                             "too huge value for local unsigned long",
4549                             klass=self.__class__,
4550                             decode_path=decode_path,
4551                             offset=offset,
4552                         )
4553                     v = v[i + 1:]
4554                     break
4555                 i += 1
4556                 if i == len(v):
4557                     raise DecodeError(
4558                         "unfinished OID",
4559                         klass=self.__class__,
4560                         decode_path=decode_path,
4561                         offset=offset,
4562                     )
4563         first_arc = 0
4564         second_arc = arcs[0]
4565         if 0 <= second_arc <= 39:
4566             first_arc = 0
4567         elif 40 <= second_arc <= 79:
4568             first_arc = 1
4569             second_arc -= 40
4570         else:
4571             first_arc = 2
4572             second_arc -= 80
4573         obj = self.__class__(
4574             value=array("L", (first_arc, second_arc)) + arcs[1:],
4575             impl=self.tag,
4576             expl=self._expl,
4577             default=self.default,
4578             optional=self.optional,
4579             _decoded=(offset, llen, l),
4580         )
4581         if ber_encoded:
4582             obj.ber_encoded = True
4583         yield decode_path, obj, tail
4584
4585     def __repr__(self):
4586         return pp_console_row(next(self.pps()))
4587
4588     def pps(self, decode_path=()):
4589         yield _pp(
4590             obj=self,
4591             asn1_type_name=self.asn1_type_name,
4592             obj_name=self.__class__.__name__,
4593             decode_path=decode_path,
4594             value=str(self) if self.ready else None,
4595             optional=self.optional,
4596             default=self == self.default,
4597             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4598             expl=None if self._expl is None else tag_decode(self._expl),
4599             offset=self.offset,
4600             tlen=self.tlen,
4601             llen=self.llen,
4602             vlen=self.vlen,
4603             expl_offset=self.expl_offset if self.expled else None,
4604             expl_tlen=self.expl_tlen if self.expled else None,
4605             expl_llen=self.expl_llen if self.expled else None,
4606             expl_vlen=self.expl_vlen if self.expled else None,
4607             expl_lenindef=self.expl_lenindef,
4608             ber_encoded=self.ber_encoded,
4609             bered=self.bered,
4610         )
4611         for pp in self.pps_lenindef(decode_path):
4612             yield pp
4613
4614
4615 class Enumerated(Integer):
4616     """``ENUMERATED`` integer type
4617
4618     This type is identical to :py:class:`pyderasn.Integer`, but requires
4619     schema to be specified and does not accept values missing from it.
4620     """
4621     __slots__ = ()
4622     tag_default = tag_encode(10)
4623     asn1_type_name = "ENUMERATED"
4624
4625     def __init__(
4626             self,
4627             value=None,
4628             impl=None,
4629             expl=None,
4630             default=None,
4631             optional=False,
4632             _specs=None,
4633             _decoded=(0, 0, 0),
4634             bounds=None,  # dummy argument, workability for Integer.decode
4635     ):
4636         super(Enumerated, self).__init__(
4637             value, bounds, impl, expl, default, optional, _specs, _decoded,
4638         )
4639         if len(self.specs) == 0:
4640             raise ValueError("schema must be specified")
4641
4642     def _value_sanitize(self, value):
4643         if isinstance(value, self.__class__):
4644             value = value._value
4645         elif isinstance(value, integer_types):
4646             for _value in itervalues(self.specs):
4647                 if _value == value:
4648                     break
4649             else:
4650                 raise DecodeError(
4651                     "unknown integer value: %s" % value,
4652                     klass=self.__class__,
4653                 )
4654         elif isinstance(value, string_types):
4655             value = self.specs.get(value)
4656             if value is None:
4657                 raise ObjUnknown("integer value: %s" % value)
4658         else:
4659             raise InvalidValueType((self.__class__, int, str))
4660         return value
4661
4662     def __call__(
4663             self,
4664             value=None,
4665             impl=None,
4666             expl=None,
4667             default=None,
4668             optional=None,
4669             _specs=None,
4670     ):
4671         return self.__class__(
4672             value=value,
4673             impl=self.tag if impl is None else impl,
4674             expl=self._expl if expl is None else expl,
4675             default=self.default if default is None else default,
4676             optional=self.optional if optional is None else optional,
4677             _specs=self.specs,
4678         )
4679
4680
4681 def escape_control_unicode(c):
4682     if unicat(c)[0] == "C":
4683         c = repr(c).lstrip("u").strip("'")
4684     return c
4685
4686
4687 class CommonString(OctetString):
4688     """Common class for all strings
4689
4690     Everything resembles :py:class:`pyderasn.OctetString`, except
4691     ability to deal with unicode text strings.
4692
4693     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4694     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4695     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4696     True
4697     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4698     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4699     >>> str(s)
4700     'привет Ð¼Ð¸Ñ€'
4701     >>> hexenc(bytes(s))
4702     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4703
4704     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4705     Traceback (most recent call last):
4706     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4707
4708     >>> BMPString("ада", bounds=(2, 2))
4709     Traceback (most recent call last):
4710     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4711     >>> s = BMPString("ад", bounds=(2, 2))
4712     >>> s.encoding
4713     'utf-16-be'
4714     >>> hexenc(bytes(s))
4715     '04300434'
4716
4717     .. list-table::
4718        :header-rows: 1
4719
4720        * - Class
4721          - Text Encoding, validation
4722        * - :py:class:`pyderasn.UTF8String`
4723          - utf-8
4724        * - :py:class:`pyderasn.NumericString`
4725          - proper alphabet validation
4726        * - :py:class:`pyderasn.PrintableString`
4727          - proper alphabet validation
4728        * - :py:class:`pyderasn.TeletexString`
4729          - iso-8859-1
4730        * - :py:class:`pyderasn.T61String`
4731          - iso-8859-1
4732        * - :py:class:`pyderasn.VideotexString`
4733          - iso-8859-1
4734        * - :py:class:`pyderasn.IA5String`
4735          - proper alphabet validation
4736        * - :py:class:`pyderasn.GraphicString`
4737          - iso-8859-1
4738        * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4739          - proper alphabet validation
4740        * - :py:class:`pyderasn.GeneralString`
4741          - iso-8859-1
4742        * - :py:class:`pyderasn.UniversalString`
4743          - utf-32-be
4744        * - :py:class:`pyderasn.BMPString`
4745          - utf-16-be
4746     """
4747     __slots__ = ()
4748
4749     def _value_sanitize(self, value):
4750         value_raw = None
4751         value_decoded = None
4752         if isinstance(value, self.__class__):
4753             value_raw = value._value
4754         elif value.__class__ == text_type:
4755             value_decoded = value
4756         elif value.__class__ == binary_type:
4757             value_raw = value
4758         else:
4759             raise InvalidValueType((self.__class__, text_type, binary_type))
4760         try:
4761             value_raw = (
4762                 value_decoded.encode(self.encoding)
4763                 if value_raw is None else value_raw
4764             )
4765             value_decoded = (
4766                 value_raw.decode(self.encoding)
4767                 if value_decoded is None else value_decoded
4768             )
4769         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4770             raise DecodeError(str(err))
4771         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4772             raise BoundsError(
4773                 self._bound_min,
4774                 len(value_decoded),
4775                 self._bound_max,
4776             )
4777         return value_raw
4778
4779     def __eq__(self, their):
4780         if their.__class__ == binary_type:
4781             return self._value == their
4782         if their.__class__ == text_type:
4783             return self._value == their.encode(self.encoding)
4784         if not isinstance(their, self.__class__):
4785             return False
4786         return (
4787             self._value == their._value and
4788             self.tag == their.tag and
4789             self._expl == their._expl
4790         )
4791
4792     def __unicode__(self):
4793         if self.ready:
4794             return self._value.decode(self.encoding)
4795         return text_type(self._value)
4796
4797     def __repr__(self):
4798         return pp_console_row(next(self.pps(no_unicode=PY2)))
4799
4800     def pps(self, decode_path=(), no_unicode=False):
4801         value = None
4802         if self.ready:
4803             value = (
4804                 hexenc(bytes(self)) if no_unicode else
4805                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4806             )
4807         yield _pp(
4808             obj=self,
4809             asn1_type_name=self.asn1_type_name,
4810             obj_name=self.__class__.__name__,
4811             decode_path=decode_path,
4812             value=value,
4813             optional=self.optional,
4814             default=self == self.default,
4815             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4816             expl=None if self._expl is None else tag_decode(self._expl),
4817             offset=self.offset,
4818             tlen=self.tlen,
4819             llen=self.llen,
4820             vlen=self.vlen,
4821             expl_offset=self.expl_offset if self.expled else None,
4822             expl_tlen=self.expl_tlen if self.expled else None,
4823             expl_llen=self.expl_llen if self.expled else None,
4824             expl_vlen=self.expl_vlen if self.expled else None,
4825             expl_lenindef=self.expl_lenindef,
4826             ber_encoded=self.ber_encoded,
4827             bered=self.bered,
4828         )
4829         for pp in self.pps_lenindef(decode_path):
4830             yield pp
4831
4832
4833 class UTF8String(CommonString):
4834     __slots__ = ()
4835     tag_default = tag_encode(12)
4836     encoding = "utf-8"
4837     asn1_type_name = "UTF8String"
4838
4839
4840 class AllowableCharsMixin(object):
4841     @property
4842     def allowable_chars(self):
4843         if PY2:
4844             return self._allowable_chars
4845         return frozenset(six_unichr(c) for c in self._allowable_chars)
4846
4847     def _value_sanitize(self, value):
4848         value = super(AllowableCharsMixin, self)._value_sanitize(value)
4849         if not frozenset(value) <= self._allowable_chars:
4850             raise DecodeError("non satisfying alphabet value")
4851         return value
4852
4853
4854 class NumericString(AllowableCharsMixin, CommonString):
4855     """Numeric string
4856
4857     Its value is properly sanitized: only ASCII digits with spaces can
4858     be stored.
4859
4860     >>> NumericString().allowable_chars
4861     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4862     """
4863     __slots__ = ()
4864     tag_default = tag_encode(18)
4865     encoding = "ascii"
4866     asn1_type_name = "NumericString"
4867     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4868
4869
4870 PrintableStringState = namedtuple(
4871     "PrintableStringState",
4872     OctetStringState._fields + ("allowable_chars",),
4873     **NAMEDTUPLE_KWARGS
4874 )
4875
4876
4877 class PrintableString(AllowableCharsMixin, CommonString):
4878     """Printable string
4879
4880     Its value is properly sanitized: see X.680 41.4 table 10.
4881
4882     >>> PrintableString().allowable_chars
4883     frozenset([' ', "'", ..., 'z'])
4884     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4885     PrintableString PrintableString foo*bar
4886     >>> obj.allow_asterisk, obj.allow_ampersand
4887     (True, False)
4888     """
4889     __slots__ = ()
4890     tag_default = tag_encode(19)
4891     encoding = "ascii"
4892     asn1_type_name = "PrintableString"
4893     _allowable_chars = frozenset(
4894         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4895     )
4896     _asterisk = frozenset("*".encode("ascii"))
4897     _ampersand = frozenset("&".encode("ascii"))
4898
4899     def __init__(
4900             self,
4901             value=None,
4902             bounds=None,
4903             impl=None,
4904             expl=None,
4905             default=None,
4906             optional=False,
4907             _decoded=(0, 0, 0),
4908             ctx=None,
4909             allow_asterisk=False,
4910             allow_ampersand=False,
4911     ):
4912         """
4913         :param allow_asterisk: allow asterisk character
4914         :param allow_ampersand: allow ampersand character
4915         """
4916         if allow_asterisk:
4917             self._allowable_chars |= self._asterisk
4918         if allow_ampersand:
4919             self._allowable_chars |= self._ampersand
4920         super(PrintableString, self).__init__(
4921             value, bounds, impl, expl, default, optional, _decoded, ctx,
4922         )
4923
4924     @property
4925     def allow_asterisk(self):
4926         """Is asterisk character allowed?
4927         """
4928         return self._asterisk <= self._allowable_chars
4929
4930     @property
4931     def allow_ampersand(self):
4932         """Is ampersand character allowed?
4933         """
4934         return self._ampersand <= self._allowable_chars
4935
4936     def __getstate__(self):
4937         return PrintableStringState(
4938             *super(PrintableString, self).__getstate__(),
4939             **{"allowable_chars": self._allowable_chars}
4940         )
4941
4942     def __setstate__(self, state):
4943         super(PrintableString, self).__setstate__(state)
4944         self._allowable_chars = state.allowable_chars
4945
4946     def __call__(
4947             self,
4948             value=None,
4949             bounds=None,
4950             impl=None,
4951             expl=None,
4952             default=None,
4953             optional=None,
4954     ):
4955         return self.__class__(
4956             value=value,
4957             bounds=(
4958                 (self._bound_min, self._bound_max)
4959                 if bounds is None else bounds
4960             ),
4961             impl=self.tag if impl is None else impl,
4962             expl=self._expl if expl is None else expl,
4963             default=self.default if default is None else default,
4964             optional=self.optional if optional is None else optional,
4965             allow_asterisk=self.allow_asterisk,
4966             allow_ampersand=self.allow_ampersand,
4967         )
4968
4969
4970 class TeletexString(CommonString):
4971     __slots__ = ()
4972     tag_default = tag_encode(20)
4973     encoding = "iso-8859-1"
4974     asn1_type_name = "TeletexString"
4975
4976
4977 class T61String(TeletexString):
4978     __slots__ = ()
4979     asn1_type_name = "T61String"
4980
4981
4982 class VideotexString(CommonString):
4983     __slots__ = ()
4984     tag_default = tag_encode(21)
4985     encoding = "iso-8859-1"
4986     asn1_type_name = "VideotexString"
4987
4988
4989 class IA5String(AllowableCharsMixin, CommonString):
4990     """IA5 string
4991
4992     Its value is properly sanitized: it is a mix of
4993
4994     * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4995     * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
4996     * DEL character (0x7F)
4997
4998     It is just 7-bit ASCII.
4999
5000     >>> IA5String().allowable_chars
5001     frozenset(["NUL", ... "DEL"])
5002     """
5003     __slots__ = ()
5004     tag_default = tag_encode(22)
5005     encoding = "ascii"
5006     asn1_type_name = "IA5"
5007     _allowable_chars = frozenset(b"".join(
5008         six_unichr(c).encode("ascii") for c in six_xrange(128)
5009     ))
5010
5011
5012 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5013 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5014 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5015 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5016 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5017 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5018
5019
5020 class VisibleString(AllowableCharsMixin, CommonString):
5021     """Visible string
5022
5023     Its value is properly sanitized. ASCII subset from space to tilde is
5024     allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5025
5026     >>> VisibleString().allowable_chars
5027     frozenset([" ", ... "~"])
5028     """
5029     __slots__ = ()
5030     tag_default = tag_encode(26)
5031     encoding = "ascii"
5032     asn1_type_name = "VisibleString"
5033     _allowable_chars = frozenset(b"".join(
5034         six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5035     ))
5036
5037
5038 class ISO646String(VisibleString):
5039     __slots__ = ()
5040     asn1_type_name = "ISO646String"
5041
5042
5043 UTCTimeState = namedtuple(
5044     "UTCTimeState",
5045     OctetStringState._fields + ("ber_raw",),
5046     **NAMEDTUPLE_KWARGS
5047 )
5048
5049
5050 def str_to_time_fractions(value):
5051     v = pureint(value)
5052     year, v = (v // 10**10), (v % 10**10)
5053     month, v = (v // 10**8), (v % 10**8)
5054     day, v = (v // 10**6), (v % 10**6)
5055     hour, v = (v // 10**4), (v % 10**4)
5056     minute, second = (v // 100), (v % 100)
5057     return year, month, day, hour, minute, second
5058
5059
5060 class UTCTime(VisibleString):
5061     """``UTCTime`` datetime type
5062
5063     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5064     UTCTime UTCTime 2017-09-30T22:07:50
5065     >>> str(t)
5066     '170930220750Z'
5067     >>> bytes(t)
5068     b'170930220750Z'
5069     >>> t.todatetime()
5070     datetime.datetime(2017, 9, 30, 22, 7, 50)
5071     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5072     datetime.datetime(1957, 9, 30, 22, 7, 50)
5073
5074     If BER encoded value was met, then ``ber_raw`` attribute will hold
5075     its raw representation.
5076
5077     .. warning::
5078
5079        Pay attention that UTCTime can not hold full year, so all years
5080        having < 50 years are treated as 20xx, 19xx otherwise, according
5081        to X.509 recommendation.
5082
5083     .. warning::
5084
5085        No strict validation of UTC offsets are made, but very crude:
5086
5087        * minutes are not exceeding 60
5088        * offset value is not exceeding 14 hours
5089     """
5090     __slots__ = ("ber_raw",)
5091     tag_default = tag_encode(23)
5092     encoding = "ascii"
5093     asn1_type_name = "UTCTime"
5094     evgen_mode_skip_value = False
5095
5096     def __init__(
5097             self,
5098             value=None,
5099             impl=None,
5100             expl=None,
5101             default=None,
5102             optional=False,
5103             _decoded=(0, 0, 0),
5104             bounds=None,  # dummy argument, workability for OctetString.decode
5105             ctx=None,
5106     ):
5107         """
5108         :param value: set the value. Either datetime type, or
5109                       :py:class:`pyderasn.UTCTime` object
5110         :param bytes impl: override default tag with ``IMPLICIT`` one
5111         :param bytes expl: override default tag with ``EXPLICIT`` one
5112         :param default: set default value. Type same as in ``value``
5113         :param bool optional: is object ``OPTIONAL`` in sequence
5114         """
5115         super(UTCTime, self).__init__(
5116             None, None, impl, expl, None, optional, _decoded, ctx,
5117         )
5118         self._value = value
5119         self.ber_raw = None
5120         if value is not None:
5121             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5122             self.ber_encoded = self.ber_raw is not None
5123         if default is not None:
5124             default, _ = self._value_sanitize(default)
5125             self.default = self.__class__(
5126                 value=default,
5127                 impl=self.tag,
5128                 expl=self._expl,
5129             )
5130             if self._value is None:
5131                 self._value = default
5132             optional = True
5133         self.optional = optional
5134
5135     def _strptime_bered(self, value):
5136         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5137         value = value[10:]
5138         if len(value) == 0:
5139             raise ValueError("no timezone")
5140         year += 2000 if year < 50 else 1900
5141         decoded = datetime(year, month, day, hour, minute)
5142         offset = 0
5143         if value[-1] == "Z":
5144             value = value[:-1]
5145         else:
5146             if len(value) < 5:
5147                 raise ValueError("invalid UTC offset")
5148             if value[-5] == "-":
5149                 sign = -1
5150             elif value[-5] == "+":
5151                 sign = 1
5152             else:
5153                 raise ValueError("invalid UTC offset")
5154             v = pureint(value[-4:])
5155             offset, v = (60 * (v % 100)), v // 100
5156             if offset >= 3600:
5157                 raise ValueError("invalid UTC offset minutes")
5158             offset += 3600 * v
5159             if offset > 14 * 3600:
5160                 raise ValueError("too big UTC offset")
5161             offset *= sign
5162             value = value[:-5]
5163         if len(value) == 0:
5164             return offset, decoded
5165         if len(value) != 2:
5166             raise ValueError("invalid UTC offset seconds")
5167         seconds = pureint(value)
5168         if seconds >= 60:
5169             raise ValueError("invalid seconds value")
5170         return offset, decoded + timedelta(seconds=seconds)
5171
5172     def _strptime(self, value):
5173         # datetime.strptime's format: %y%m%d%H%M%SZ
5174         if len(value) != LEN_YYMMDDHHMMSSZ:
5175             raise ValueError("invalid UTCTime length")
5176         if value[-1] != "Z":
5177             raise ValueError("non UTC timezone")
5178         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5179         year += 2000 if year < 50 else 1900
5180         return datetime(year, month, day, hour, minute, second)
5181
5182     def _dt_sanitize(self, value):
5183         if value.year < 1950 or value.year > 2049:
5184             raise ValueError("UTCTime can hold only 1950-2049 years")
5185         return value.replace(microsecond=0)
5186
5187     def _value_sanitize(self, value, ctx=None):
5188         if value.__class__ == binary_type:
5189             try:
5190                 value_decoded = value.decode("ascii")
5191             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5192                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5193             err = None
5194             try:
5195                 return self._strptime(value_decoded), None
5196             except (TypeError, ValueError) as _err:
5197                 err = _err
5198                 if (ctx is not None) and ctx.get("bered", False):
5199                     try:
5200                         offset, _value = self._strptime_bered(value_decoded)
5201                         _value = _value - timedelta(seconds=offset)
5202                         return self._dt_sanitize(_value), value
5203                     except (TypeError, ValueError, OverflowError) as _err:
5204                         err = _err
5205             raise DecodeError(
5206                 "invalid %s format: %r" % (self.asn1_type_name, err),
5207                 klass=self.__class__,
5208             )
5209         if isinstance(value, self.__class__):
5210             return value._value, None
5211         if value.__class__ == datetime:
5212             return self._dt_sanitize(value), None
5213         raise InvalidValueType((self.__class__, datetime))
5214
5215     def _pp_value(self):
5216         if self.ready:
5217             value = self._value.isoformat()
5218             if self.ber_encoded:
5219                 value += " (%s)" % self.ber_raw
5220             return value
5221         return None
5222
5223     def __unicode__(self):
5224         if self.ready:
5225             value = self._value.isoformat()
5226             if self.ber_encoded:
5227                 value += " (%s)" % self.ber_raw
5228             return value
5229         return text_type(self._pp_value())
5230
5231     def __getstate__(self):
5232         return UTCTimeState(
5233             *super(UTCTime, self).__getstate__(),
5234             **{"ber_raw": self.ber_raw}
5235         )
5236
5237     def __setstate__(self, state):
5238         super(UTCTime, self).__setstate__(state)
5239         self.ber_raw = state.ber_raw
5240
5241     def __bytes__(self):
5242         self._assert_ready()
5243         return self._encode_time()
5244
5245     def __eq__(self, their):
5246         if their.__class__ == binary_type:
5247             return self._encode_time() == their
5248         if their.__class__ == datetime:
5249             return self.todatetime() == their
5250         if not isinstance(their, self.__class__):
5251             return False
5252         return (
5253             self._value == their._value and
5254             self.tag == their.tag and
5255             self._expl == their._expl
5256         )
5257
5258     def _encode_time(self):
5259         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5260
5261     def _encode(self):
5262         self._assert_ready()
5263         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5264
5265     def _encode1st(self, state):
5266         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5267
5268     def _encode2nd(self, writer, state_iter):
5269         self._assert_ready()
5270         write_full(writer, self._encode())
5271
5272     def _encode_cer(self, writer):
5273         write_full(writer, self._encode())
5274
5275     def todatetime(self):
5276         return self._value
5277
5278     def __repr__(self):
5279         return pp_console_row(next(self.pps()))
5280
5281     def pps(self, decode_path=()):
5282         yield _pp(
5283             obj=self,
5284             asn1_type_name=self.asn1_type_name,
5285             obj_name=self.__class__.__name__,
5286             decode_path=decode_path,
5287             value=self._pp_value(),
5288             optional=self.optional,
5289             default=self == self.default,
5290             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5291             expl=None if self._expl is None else tag_decode(self._expl),
5292             offset=self.offset,
5293             tlen=self.tlen,
5294             llen=self.llen,
5295             vlen=self.vlen,
5296             expl_offset=self.expl_offset if self.expled else None,
5297             expl_tlen=self.expl_tlen if self.expled else None,
5298             expl_llen=self.expl_llen if self.expled else None,
5299             expl_vlen=self.expl_vlen if self.expled else None,
5300             expl_lenindef=self.expl_lenindef,
5301             ber_encoded=self.ber_encoded,
5302             bered=self.bered,
5303         )
5304         for pp in self.pps_lenindef(decode_path):
5305             yield pp
5306
5307
5308 class GeneralizedTime(UTCTime):
5309     """``GeneralizedTime`` datetime type
5310
5311     This type is similar to :py:class:`pyderasn.UTCTime`.
5312
5313     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5314     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5315     >>> str(t)
5316     '20170930220750.000123Z'
5317     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5318     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5319
5320     .. warning::
5321
5322        Only microsecond fractions are supported in DER encoding.
5323        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5324        higher precision values.
5325
5326     .. warning::
5327
5328        BER encoded data can loss information (accuracy) during decoding
5329        because of float transformations.
5330
5331     .. warning::
5332
5333        Local times (without explicit timezone specification) are treated
5334        as UTC one, no transformations are made.
5335
5336     .. warning::
5337
5338        Zero year is unsupported.
5339     """
5340     __slots__ = ()
5341     tag_default = tag_encode(24)
5342     asn1_type_name = "GeneralizedTime"
5343
5344     def _dt_sanitize(self, value):
5345         return value
5346
5347     def _strptime_bered(self, value):
5348         if len(value) < 4 + 3 * 2:
5349             raise ValueError("invalid GeneralizedTime")
5350         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5351         decoded = datetime(year, month, day, hour)
5352         offset, value = 0, value[10:]
5353         if len(value) == 0:
5354             return offset, decoded
5355         if value[-1] == "Z":
5356             value = value[:-1]
5357         else:
5358             for char, sign in (("-", -1), ("+", 1)):
5359                 idx = value.rfind(char)
5360                 if idx == -1:
5361                     continue
5362                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5363                 v = pureint(offset_raw)
5364                 if len(offset_raw) == 4:
5365                     offset, v = (60 * (v % 100)), v // 100
5366                     if offset >= 3600:
5367                         raise ValueError("invalid UTC offset minutes")
5368                 elif len(offset_raw) == 2:
5369                     pass
5370                 else:
5371                     raise ValueError("invalid UTC offset")
5372                 offset += 3600 * v
5373                 if offset > 14 * 3600:
5374                     raise ValueError("too big UTC offset")
5375                 offset *= sign
5376                 break
5377         if len(value) == 0:
5378             return offset, decoded
5379         if value[0] in DECIMAL_SIGNS:
5380             return offset, (
5381                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5382             )
5383         if len(value) < 2:
5384             raise ValueError("stripped minutes")
5385         decoded += timedelta(seconds=60 * pureint(value[:2]))
5386         value = value[2:]
5387         if len(value) == 0:
5388             return offset, decoded
5389         if value[0] in DECIMAL_SIGNS:
5390             return offset, (
5391                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5392             )
5393         if len(value) < 2:
5394             raise ValueError("stripped seconds")
5395         decoded += timedelta(seconds=pureint(value[:2]))
5396         value = value[2:]
5397         if len(value) == 0:
5398             return offset, decoded
5399         if value[0] not in DECIMAL_SIGNS:
5400             raise ValueError("invalid format after seconds")
5401         return offset, (
5402             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5403         )
5404
5405     def _strptime(self, value):
5406         l = len(value)
5407         if l == LEN_YYYYMMDDHHMMSSZ:
5408             # datetime.strptime's format: %Y%m%d%H%M%SZ
5409             if value[-1] != "Z":
5410                 raise ValueError("non UTC timezone")
5411             return datetime(*str_to_time_fractions(value[:-1]))
5412         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5413             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5414             if value[-1] != "Z":
5415                 raise ValueError("non UTC timezone")
5416             if value[14] != ".":
5417                 raise ValueError("no fractions separator")
5418             us = value[15:-1]
5419             if us[-1] == "0":
5420                 raise ValueError("trailing zero")
5421             us_len = len(us)
5422             if us_len > 6:
5423                 raise ValueError("only microsecond fractions are supported")
5424             us = pureint(us + ("0" * (6 - us_len)))
5425             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5426             return datetime(year, month, day, hour, minute, second, us)
5427         raise ValueError("invalid GeneralizedTime length")
5428
5429     def _encode_time(self):
5430         value = self._value
5431         encoded = value.strftime("%Y%m%d%H%M%S")
5432         if value.microsecond > 0:
5433             encoded += (".%06d" % value.microsecond).rstrip("0")
5434         return (encoded + "Z").encode("ascii")
5435
5436     def _encode(self):
5437         self._assert_ready()
5438         value = self._value
5439         if value.microsecond > 0:
5440             encoded = self._encode_time()
5441             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5442         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5443
5444     def _encode1st(self, state):
5445         self._assert_ready()
5446         vlen = len(self._encode_time())
5447         return len(self.tag) + len_size(vlen) + vlen, state
5448
5449     def _encode2nd(self, writer, state_iter):
5450         write_full(writer, self._encode())
5451
5452
5453 class GraphicString(CommonString):
5454     __slots__ = ()
5455     tag_default = tag_encode(25)
5456     encoding = "iso-8859-1"
5457     asn1_type_name = "GraphicString"
5458
5459
5460 class GeneralString(CommonString):
5461     __slots__ = ()
5462     tag_default = tag_encode(27)
5463     encoding = "iso-8859-1"
5464     asn1_type_name = "GeneralString"
5465
5466
5467 class UniversalString(CommonString):
5468     __slots__ = ()
5469     tag_default = tag_encode(28)
5470     encoding = "utf-32-be"
5471     asn1_type_name = "UniversalString"
5472
5473
5474 class BMPString(CommonString):
5475     __slots__ = ()
5476     tag_default = tag_encode(30)
5477     encoding = "utf-16-be"
5478     asn1_type_name = "BMPString"
5479
5480
5481 ChoiceState = namedtuple(
5482     "ChoiceState",
5483     BasicState._fields + ("specs", "value",),
5484     **NAMEDTUPLE_KWARGS
5485 )
5486
5487
5488 class Choice(Obj):
5489     """``CHOICE`` special type
5490
5491     ::
5492
5493         class GeneralName(Choice):
5494             schema = (
5495                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5496                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5497             )
5498
5499     >>> gn = GeneralName()
5500     GeneralName CHOICE
5501     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5502     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5503     >>> gn["dNSName"] = IA5String("bar.baz")
5504     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5505     >>> gn["rfc822Name"]
5506     None
5507     >>> gn["dNSName"]
5508     [2] IA5String IA5 bar.baz
5509     >>> gn.choice
5510     'dNSName'
5511     >>> gn.value == gn["dNSName"]
5512     True
5513     >>> gn.specs
5514     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5515
5516     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5517     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5518     """
5519     __slots__ = ("specs",)
5520     tag_default = None
5521     asn1_type_name = "CHOICE"
5522
5523     def __init__(
5524             self,
5525             value=None,
5526             schema=None,
5527             impl=None,
5528             expl=None,
5529             default=None,
5530             optional=False,
5531             _decoded=(0, 0, 0),
5532     ):
5533         """
5534         :param value: set the value. Either ``(choice, value)`` tuple, or
5535                       :py:class:`pyderasn.Choice` object
5536         :param bytes impl: can not be set, do **not** use it
5537         :param bytes expl: override default tag with ``EXPLICIT`` one
5538         :param default: set default value. Type same as in ``value``
5539         :param bool optional: is object ``OPTIONAL`` in sequence
5540         """
5541         if impl is not None:
5542             raise ValueError("no implicit tag allowed for CHOICE")
5543         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5544         if schema is None:
5545             schema = getattr(self, "schema", ())
5546         if len(schema) == 0:
5547             raise ValueError("schema must be specified")
5548         self.specs = (
5549             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5550         )
5551         self._value = None
5552         if value is not None:
5553             self._value = self._value_sanitize(value)
5554         if default is not None:
5555             default_value = self._value_sanitize(default)
5556             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5557             default_obj.specs = self.specs
5558             default_obj._value = default_value
5559             self.default = default_obj
5560             if value is None:
5561                 self._value = copy(default_obj._value)
5562         if self._expl is not None:
5563             tag_class, _, tag_num = tag_decode(self._expl)
5564             self._tag_order = (tag_class, tag_num)
5565
5566     def _value_sanitize(self, value):
5567         if (value.__class__ == tuple) and len(value) == 2:
5568             choice, obj = value
5569             spec = self.specs.get(choice)
5570             if spec is None:
5571                 raise ObjUnknown(choice)
5572             if not isinstance(obj, spec.__class__):
5573                 raise InvalidValueType((spec,))
5574             return (choice, spec(obj))
5575         if isinstance(value, self.__class__):
5576             return value._value
5577         raise InvalidValueType((self.__class__, tuple))
5578
5579     @property
5580     def ready(self):
5581         return self._value is not None and self._value[1].ready
5582
5583     @property
5584     def bered(self):
5585         return self.expl_lenindef or (
5586             (self._value is not None) and
5587             self._value[1].bered
5588         )
5589
5590     def __getstate__(self):
5591         return ChoiceState(
5592             __version__,
5593             self.tag,
5594             self._tag_order,
5595             self._expl,
5596             self.default,
5597             self.optional,
5598             self.offset,
5599             self.llen,
5600             self.vlen,
5601             self.expl_lenindef,
5602             self.lenindef,
5603             self.ber_encoded,
5604             self.specs,
5605             copy(self._value),
5606         )
5607
5608     def __setstate__(self, state):
5609         super(Choice, self).__setstate__(state)
5610         self.specs = state.specs
5611         self._value = state.value
5612
5613     def __eq__(self, their):
5614         if (their.__class__ == tuple) and len(their) == 2:
5615             return self._value == their
5616         if not isinstance(their, self.__class__):
5617             return False
5618         return (
5619             self.specs == their.specs and
5620             self._value == their._value
5621         )
5622
5623     def __call__(
5624             self,
5625             value=None,
5626             expl=None,
5627             default=None,
5628             optional=None,
5629     ):
5630         return self.__class__(
5631             value=value,
5632             schema=self.specs,
5633             expl=self._expl if expl is None else expl,
5634             default=self.default if default is None else default,
5635             optional=self.optional if optional is None else optional,
5636         )
5637
5638     @property
5639     def choice(self):
5640         """Name of the choice
5641         """
5642         self._assert_ready()
5643         return self._value[0]
5644
5645     @property
5646     def value(self):
5647         """Value of underlying choice
5648         """
5649         self._assert_ready()
5650         return self._value[1]
5651
5652     @property
5653     def tag_order(self):
5654         self._assert_ready()
5655         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5656
5657     @property
5658     def tag_order_cer(self):
5659         return min(v.tag_order_cer for v in itervalues(self.specs))
5660
5661     def __getitem__(self, key):
5662         if key not in self.specs:
5663             raise ObjUnknown(key)
5664         if self._value is None:
5665             return None
5666         choice, value = self._value
5667         if choice != key:
5668             return None
5669         return value
5670
5671     def __setitem__(self, key, value):
5672         spec = self.specs.get(key)
5673         if spec is None:
5674             raise ObjUnknown(key)
5675         if not isinstance(value, spec.__class__):
5676             raise InvalidValueType((spec.__class__,))
5677         self._value = (key, spec(value))
5678
5679     @property
5680     def tlen(self):
5681         return 0
5682
5683     @property
5684     def decoded(self):
5685         return self._value[1].decoded if self.ready else False
5686
5687     def _encode(self):
5688         self._assert_ready()
5689         return self._value[1].encode()
5690
5691     def _encode1st(self, state):
5692         self._assert_ready()
5693         return self._value[1].encode1st(state)
5694
5695     def _encode2nd(self, writer, state_iter):
5696         self._value[1].encode2nd(writer, state_iter)
5697
5698     def _encode_cer(self, writer):
5699         self._assert_ready()
5700         self._value[1].encode_cer(writer)
5701
5702     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5703         for choice, spec in iteritems(self.specs):
5704             sub_decode_path = decode_path + (choice,)
5705             try:
5706                 spec.decode(
5707                     tlv,
5708                     offset=offset,
5709                     leavemm=True,
5710                     decode_path=sub_decode_path,
5711                     ctx=ctx,
5712                     tag_only=True,
5713                     _ctx_immutable=False,
5714                 )
5715             except TagMismatch:
5716                 continue
5717             break
5718         else:
5719             raise TagMismatch(
5720                 klass=self.__class__,
5721                 decode_path=decode_path,
5722                 offset=offset,
5723             )
5724         if tag_only:  # pragma: no cover
5725             yield None
5726             return
5727         if evgen_mode:
5728             for _decode_path, value, tail in spec.decode_evgen(
5729                     tlv,
5730                     offset=offset,
5731                     leavemm=True,
5732                     decode_path=sub_decode_path,
5733                     ctx=ctx,
5734                     _ctx_immutable=False,
5735             ):
5736                 yield _decode_path, value, tail
5737         else:
5738             _, value, tail = next(spec.decode_evgen(
5739                 tlv,
5740                 offset=offset,
5741                 leavemm=True,
5742                 decode_path=sub_decode_path,
5743                 ctx=ctx,
5744                 _ctx_immutable=False,
5745                 _evgen_mode=False,
5746             ))
5747         obj = self.__class__(
5748             schema=self.specs,
5749             expl=self._expl,
5750             default=self.default,
5751             optional=self.optional,
5752             _decoded=(offset, 0, value.fulllen),
5753         )
5754         obj._value = (choice, value)
5755         yield decode_path, obj, tail
5756
5757     def __repr__(self):
5758         value = pp_console_row(next(self.pps()))
5759         if self.ready:
5760             value = "%s[%r]" % (value, self.value)
5761         return value
5762
5763     def pps(self, decode_path=()):
5764         yield _pp(
5765             obj=self,
5766             asn1_type_name=self.asn1_type_name,
5767             obj_name=self.__class__.__name__,
5768             decode_path=decode_path,
5769             value=self.choice if self.ready else None,
5770             optional=self.optional,
5771             default=self == self.default,
5772             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5773             expl=None if self._expl is None else tag_decode(self._expl),
5774             offset=self.offset,
5775             tlen=self.tlen,
5776             llen=self.llen,
5777             vlen=self.vlen,
5778             expl_lenindef=self.expl_lenindef,
5779             bered=self.bered,
5780         )
5781         if self.ready:
5782             yield self.value.pps(decode_path=decode_path + (self.choice,))
5783         for pp in self.pps_lenindef(decode_path):
5784             yield pp
5785
5786
5787 class PrimitiveTypes(Choice):
5788     """Predefined ``CHOICE`` for all generic primitive types
5789
5790     It could be useful for general decoding of some unspecified values:
5791
5792     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5793     OCTET STRING 3 bytes 666f6f
5794     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5795     INTEGER 1193046
5796     """
5797     __slots__ = ()
5798     schema = tuple((klass.__name__, klass()) for klass in (
5799         Boolean,
5800         Integer,
5801         BitString,
5802         OctetString,
5803         Null,
5804         ObjectIdentifier,
5805         UTF8String,
5806         NumericString,
5807         PrintableString,
5808         TeletexString,
5809         VideotexString,
5810         IA5String,
5811         UTCTime,
5812         GeneralizedTime,
5813         GraphicString,
5814         VisibleString,
5815         ISO646String,
5816         GeneralString,
5817         UniversalString,
5818         BMPString,
5819     ))
5820
5821
5822 AnyState = namedtuple(
5823     "AnyState",
5824     BasicState._fields + ("value", "defined"),
5825     **NAMEDTUPLE_KWARGS
5826 )
5827
5828
5829 class Any(Obj):
5830     """``ANY`` special type
5831
5832     >>> Any(Integer(-123))
5833     ANY INTEGER -123 (0X:7B)
5834     >>> a = Any(OctetString(b"hello world").encode())
5835     ANY 040b68656c6c6f20776f726c64
5836     >>> hexenc(bytes(a))
5837     b'0x040x0bhello world'
5838     """
5839     __slots__ = ("defined",)
5840     tag_default = tag_encode(0)
5841     asn1_type_name = "ANY"
5842
5843     def __init__(
5844             self,
5845             value=None,
5846             expl=None,
5847             optional=False,
5848             _decoded=(0, 0, 0),
5849     ):
5850         """
5851         :param value: set the value. Either any kind of pyderasn's
5852                       **ready** object, or bytes. Pay attention that
5853                       **no** validation is performed if raw binary value
5854                       is valid TLV, except just tag decoding
5855         :param bytes expl: override default tag with ``EXPLICIT`` one
5856         :param bool optional: is object ``OPTIONAL`` in sequence
5857         """
5858         super(Any, self).__init__(None, expl, None, optional, _decoded)
5859         if value is None:
5860             self._value = None
5861         else:
5862             value = self._value_sanitize(value)
5863             self._value = value
5864             if self._expl is None:
5865                 if value.__class__ == binary_type:
5866                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5867                 else:
5868                     tag_class, tag_num = value.tag_order
5869             else:
5870                 tag_class, _, tag_num = tag_decode(self._expl)
5871             self._tag_order = (tag_class, tag_num)
5872         self.defined = None
5873
5874     def _value_sanitize(self, value):
5875         if value.__class__ == binary_type:
5876             if len(value) == 0:
5877                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5878             return value
5879         if isinstance(value, self.__class__):
5880             return value._value
5881         if not isinstance(value, Obj):
5882             raise InvalidValueType((self.__class__, Obj, binary_type))
5883         return value
5884
5885     @property
5886     def ready(self):
5887         return self._value is not None
5888
5889     @property
5890     def tag_order(self):
5891         self._assert_ready()
5892         return self._tag_order
5893
5894     @property
5895     def bered(self):
5896         if self.expl_lenindef or self.lenindef:
5897             return True
5898         if self.defined is None:
5899             return False
5900         return self.defined[1].bered
5901
5902     def __getstate__(self):
5903         return AnyState(
5904             __version__,
5905             self.tag,
5906             self._tag_order,
5907             self._expl,
5908             None,
5909             self.optional,
5910             self.offset,
5911             self.llen,
5912             self.vlen,
5913             self.expl_lenindef,
5914             self.lenindef,
5915             self.ber_encoded,
5916             self._value,
5917             self.defined,
5918         )
5919
5920     def __setstate__(self, state):
5921         super(Any, self).__setstate__(state)
5922         self._value = state.value
5923         self.defined = state.defined
5924
5925     def __eq__(self, their):
5926         if their.__class__ == binary_type:
5927             if self._value.__class__ == binary_type:
5928                 return self._value == their
5929             return self._value.encode() == their
5930         if issubclass(their.__class__, Any):
5931             if self.ready and their.ready:
5932                 return bytes(self) == bytes(their)
5933             return self.ready == their.ready
5934         return False
5935
5936     def __call__(
5937             self,
5938             value=None,
5939             expl=None,
5940             optional=None,
5941     ):
5942         return self.__class__(
5943             value=value,
5944             expl=self._expl if expl is None else expl,
5945             optional=self.optional if optional is None else optional,
5946         )
5947
5948     def __bytes__(self):
5949         self._assert_ready()
5950         value = self._value
5951         if value.__class__ == binary_type:
5952             return value
5953         return self._value.encode()
5954
5955     @property
5956     def tlen(self):
5957         return 0
5958
5959     def _encode(self):
5960         self._assert_ready()
5961         value = self._value
5962         if value.__class__ == binary_type:
5963             return value
5964         return value.encode()
5965
5966     def _encode1st(self, state):
5967         self._assert_ready()
5968         value = self._value
5969         if value.__class__ == binary_type:
5970             return len(value), state
5971         return value.encode1st(state)
5972
5973     def _encode2nd(self, writer, state_iter):
5974         value = self._value
5975         if value.__class__ == binary_type:
5976             write_full(writer, value)
5977         else:
5978             value.encode2nd(writer, state_iter)
5979
5980     def _encode_cer(self, writer):
5981         self._assert_ready()
5982         value = self._value
5983         if value.__class__ == binary_type:
5984             write_full(writer, value)
5985         else:
5986             value.encode_cer(writer)
5987
5988     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5989         try:
5990             t, tlen, lv = tag_strip(tlv)
5991         except DecodeError as err:
5992             raise err.__class__(
5993                 msg=err.msg,
5994                 klass=self.__class__,
5995                 decode_path=decode_path,
5996                 offset=offset,
5997             )
5998         try:
5999             l, llen, v = len_decode(lv)
6000         except LenIndefForm as err:
6001             if not ctx.get("bered", False):
6002                 raise err.__class__(
6003                     msg=err.msg,
6004                     klass=self.__class__,
6005                     decode_path=decode_path,
6006                     offset=offset,
6007                 )
6008             llen, vlen, v = 1, 0, lv[1:]
6009             sub_offset = offset + tlen + llen
6010             chunk_i = 0
6011             while v[:EOC_LEN].tobytes() != EOC:
6012                 chunk, v = Any().decode(
6013                     v,
6014                     offset=sub_offset,
6015                     decode_path=decode_path + (str(chunk_i),),
6016                     leavemm=True,
6017                     ctx=ctx,
6018                     _ctx_immutable=False,
6019                 )
6020                 vlen += chunk.tlvlen
6021                 sub_offset += chunk.tlvlen
6022                 chunk_i += 1
6023             tlvlen = tlen + llen + vlen + EOC_LEN
6024             obj = self.__class__(
6025                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6026                 expl=self._expl,
6027                 optional=self.optional,
6028                 _decoded=(offset, 0, tlvlen),
6029             )
6030             obj.lenindef = True
6031             obj.tag = t.tobytes()
6032             yield decode_path, obj, v[EOC_LEN:]
6033             return
6034         except DecodeError as err:
6035             raise err.__class__(
6036                 msg=err.msg,
6037                 klass=self.__class__,
6038                 decode_path=decode_path,
6039                 offset=offset,
6040             )
6041         if l > len(v):
6042             raise NotEnoughData(
6043                 "encoded length is longer than data",
6044                 klass=self.__class__,
6045                 decode_path=decode_path,
6046                 offset=offset,
6047             )
6048         tlvlen = tlen + llen + l
6049         v, tail = tlv[:tlvlen], v[l:]
6050         obj = self.__class__(
6051             value=None if evgen_mode else v.tobytes(),
6052             expl=self._expl,
6053             optional=self.optional,
6054             _decoded=(offset, 0, tlvlen),
6055         )
6056         obj.tag = t.tobytes()
6057         yield decode_path, obj, tail
6058
6059     def __repr__(self):
6060         return pp_console_row(next(self.pps()))
6061
6062     def pps(self, decode_path=()):
6063         value = self._value
6064         if value is None:
6065             pass
6066         elif value.__class__ == binary_type:
6067             value = None
6068         else:
6069             value = repr(value)
6070         yield _pp(
6071             obj=self,
6072             asn1_type_name=self.asn1_type_name,
6073             obj_name=self.__class__.__name__,
6074             decode_path=decode_path,
6075             value=value,
6076             blob=self._value if self._value.__class__ == binary_type else None,
6077             optional=self.optional,
6078             default=self == self.default,
6079             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6080             expl=None if self._expl is None else tag_decode(self._expl),
6081             offset=self.offset,
6082             tlen=self.tlen,
6083             llen=self.llen,
6084             vlen=self.vlen,
6085             expl_offset=self.expl_offset if self.expled else None,
6086             expl_tlen=self.expl_tlen if self.expled else None,
6087             expl_llen=self.expl_llen if self.expled else None,
6088             expl_vlen=self.expl_vlen if self.expled else None,
6089             expl_lenindef=self.expl_lenindef,
6090             lenindef=self.lenindef,
6091             bered=self.bered,
6092         )
6093         defined_by, defined = self.defined or (None, None)
6094         if defined_by is not None:
6095             yield defined.pps(
6096                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6097             )
6098         for pp in self.pps_lenindef(decode_path):
6099             yield pp
6100
6101
6102 ########################################################################
6103 # ASN.1 constructed types
6104 ########################################################################
6105
6106 def abs_decode_path(decode_path, rel_path):
6107     """Create an absolute decode path from current and relative ones
6108
6109     :param decode_path: current decode path, starting point. Tuple of strings
6110     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6111                      If first tuple's element is "/", then treat it as
6112                      an absolute path, ignoring ``decode_path`` as
6113                      starting point. Also this tuple can contain ".."
6114                      elements, stripping the leading element from
6115                      ``decode_path``
6116
6117     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6118     ("foo", "bar", "baz", "whatever")
6119     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6120     ("foo", "whatever")
6121     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6122     ("baz", "whatever")
6123     """
6124     if rel_path[0] == "/":
6125         return rel_path[1:]
6126     if rel_path[0] == "..":
6127         return abs_decode_path(decode_path[:-1], rel_path[1:])
6128     return decode_path + rel_path
6129
6130
6131 SequenceState = namedtuple(
6132     "SequenceState",
6133     BasicState._fields + ("specs", "value",),
6134     **NAMEDTUPLE_KWARGS
6135 )
6136
6137
6138 class SequenceEncode1stMixing(object):
6139     def _encode1st(self, state):
6140         state.append(0)
6141         idx = len(state) - 1
6142         vlen = 0
6143         for v in self._values_for_encoding():
6144             l, _ = v.encode1st(state)
6145             vlen += l
6146         state[idx] = vlen
6147         return len(self.tag) + len_size(vlen) + vlen, state
6148
6149
6150 class Sequence(SequenceEncode1stMixing, Obj):
6151     """``SEQUENCE`` structure type
6152
6153     You have to make specification of sequence::
6154
6155         class Extension(Sequence):
6156             schema = (
6157                 ("extnID", ObjectIdentifier()),
6158                 ("critical", Boolean(default=False)),
6159                 ("extnValue", OctetString()),
6160             )
6161
6162     Then, you can work with it as with dictionary.
6163
6164     >>> ext = Extension()
6165     >>> Extension().specs
6166     OrderedDict([
6167         ('extnID', OBJECT IDENTIFIER),
6168         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6169         ('extnValue', OCTET STRING),
6170     ])
6171     >>> ext["extnID"] = "1.2.3"
6172     Traceback (most recent call last):
6173     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6174     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6175
6176     You can determine if sequence is ready to be encoded:
6177
6178     >>> ext.ready
6179     False
6180     >>> ext.encode()
6181     Traceback (most recent call last):
6182     pyderasn.ObjNotReady: object is not ready: extnValue
6183     >>> ext["extnValue"] = OctetString(b"foobar")
6184     >>> ext.ready
6185     True
6186
6187     Value you want to assign, must have the same **type** as in
6188     corresponding specification, but it can have different tags,
6189     optional/default attributes -- they will be taken from specification
6190     automatically::
6191
6192         class TBSCertificate(Sequence):
6193             schema = (
6194                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6195             [...]
6196
6197     >>> tbs = TBSCertificate()
6198     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6199
6200     Assign ``None`` to remove value from sequence.
6201
6202     You can set values in Sequence during its initialization:
6203
6204     >>> AlgorithmIdentifier((
6205         ("algorithm", ObjectIdentifier("1.2.3")),
6206         ("parameters", Any(Null()))
6207     ))
6208     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6209
6210     You can determine if value exists/set in the sequence and take its value:
6211
6212     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6213     (True, True, False)
6214     >>> ext["extnID"]
6215     OBJECT IDENTIFIER 1.2.3
6216
6217     But pay attention that if value has default, then it won't be (not
6218     in) in the sequence (because ``DEFAULT`` must not be encoded in
6219     DER), but you can read its value:
6220
6221     >>> "critical" in ext, ext["critical"]
6222     (False, BOOLEAN False)
6223     >>> ext["critical"] = Boolean(True)
6224     >>> "critical" in ext, ext["critical"]
6225     (True, BOOLEAN True)
6226
6227     All defaulted values are always optional.
6228
6229     .. _allow_default_values_ctx:
6230
6231     DER prohibits default value encoding and will raise an error if
6232     default value is unexpectedly met during decode.
6233     If :ref:`bered <bered_ctx>` context option is set, then no error
6234     will be raised, but ``bered`` attribute set. You can disable strict
6235     defaulted values existence validation by setting
6236     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6237
6238     All values with DEFAULT specified are decoded atomically in
6239     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6240     SEQUENCE, then it will be yielded as a single element, not
6241     disassembled. That is required for DEFAULT existence check.
6242
6243     Two sequences are equal if they have equal specification (schema),
6244     implicit/explicit tagging and the same values.
6245     """
6246     __slots__ = ("specs",)
6247     tag_default = tag_encode(form=TagFormConstructed, num=16)
6248     asn1_type_name = "SEQUENCE"
6249
6250     def __init__(
6251             self,
6252             value=None,
6253             schema=None,
6254             impl=None,
6255             expl=None,
6256             default=None,
6257             optional=False,
6258             _decoded=(0, 0, 0),
6259     ):
6260         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6261         if schema is None:
6262             schema = getattr(self, "schema", ())
6263         self.specs = (
6264             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6265         )
6266         self._value = {}
6267         if value is not None:
6268             if issubclass(value.__class__, Sequence):
6269                 self._value = value._value
6270             elif hasattr(value, "__iter__"):
6271                 for seq_key, seq_value in value:
6272                     self[seq_key] = seq_value
6273             else:
6274                 raise InvalidValueType((Sequence,))
6275         if default is not None:
6276             if not issubclass(default.__class__, Sequence):
6277                 raise InvalidValueType((Sequence,))
6278             default_value = default._value
6279             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6280             default_obj.specs = self.specs
6281             default_obj._value = default_value
6282             self.default = default_obj
6283             if value is None:
6284                 self._value = copy(default_obj._value)
6285
6286     @property
6287     def ready(self):
6288         for name, spec in iteritems(self.specs):
6289             value = self._value.get(name)
6290             if value is None:
6291                 if spec.optional:
6292                     continue
6293                 return False
6294             if not value.ready:
6295                 return False
6296         return True
6297
6298     @property
6299     def bered(self):
6300         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6301             return True
6302         return any(value.bered for value in itervalues(self._value))
6303
6304     def __getstate__(self):
6305         return SequenceState(
6306             __version__,
6307             self.tag,
6308             self._tag_order,
6309             self._expl,
6310             self.default,
6311             self.optional,
6312             self.offset,
6313             self.llen,
6314             self.vlen,
6315             self.expl_lenindef,
6316             self.lenindef,
6317             self.ber_encoded,
6318             self.specs,
6319             {k: copy(v) for k, v in iteritems(self._value)},
6320         )
6321
6322     def __setstate__(self, state):
6323         super(Sequence, self).__setstate__(state)
6324         self.specs = state.specs
6325         self._value = state.value
6326
6327     def __eq__(self, their):
6328         if not isinstance(their, self.__class__):
6329             return False
6330         return (
6331             self.specs == their.specs and
6332             self.tag == their.tag and
6333             self._expl == their._expl and
6334             self._value == their._value
6335         )
6336
6337     def __call__(
6338             self,
6339             value=None,
6340             impl=None,
6341             expl=None,
6342             default=None,
6343             optional=None,
6344     ):
6345         return self.__class__(
6346             value=value,
6347             schema=self.specs,
6348             impl=self.tag if impl is None else impl,
6349             expl=self._expl if expl is None else expl,
6350             default=self.default if default is None else default,
6351             optional=self.optional if optional is None else optional,
6352         )
6353
6354     def __contains__(self, key):
6355         return key in self._value
6356
6357     def __setitem__(self, key, value):
6358         spec = self.specs.get(key)
6359         if spec is None:
6360             raise ObjUnknown(key)
6361         if value is None:
6362             self._value.pop(key, None)
6363             return
6364         if not isinstance(value, spec.__class__):
6365             raise InvalidValueType((spec.__class__,))
6366         value = spec(value=value)
6367         if spec.default is not None and value == spec.default:
6368             self._value.pop(key, None)
6369             return
6370         self._value[key] = value
6371
6372     def __getitem__(self, key):
6373         value = self._value.get(key)
6374         if value is not None:
6375             return value
6376         spec = self.specs.get(key)
6377         if spec is None:
6378             raise ObjUnknown(key)
6379         if spec.default is not None:
6380             return spec.default
6381         return None
6382
6383     def _values_for_encoding(self):
6384         for name, spec in iteritems(self.specs):
6385             value = self._value.get(name)
6386             if value is None:
6387                 if spec.optional:
6388                     continue
6389                 raise ObjNotReady(name)
6390             yield value
6391
6392     def _encode(self):
6393         v = b"".join(v.encode() for v in self._values_for_encoding())
6394         return b"".join((self.tag, len_encode(len(v)), v))
6395
6396     def _encode2nd(self, writer, state_iter):
6397         write_full(writer, self.tag + len_encode(next(state_iter)))
6398         for v in self._values_for_encoding():
6399             v.encode2nd(writer, state_iter)
6400
6401     def _encode_cer(self, writer):
6402         write_full(writer, self.tag + LENINDEF)
6403         for v in self._values_for_encoding():
6404             v.encode_cer(writer)
6405         write_full(writer, EOC)
6406
6407     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6408         try:
6409             t, tlen, lv = tag_strip(tlv)
6410         except DecodeError as err:
6411             raise err.__class__(
6412                 msg=err.msg,
6413                 klass=self.__class__,
6414                 decode_path=decode_path,
6415                 offset=offset,
6416             )
6417         if t != self.tag:
6418             raise TagMismatch(
6419                 klass=self.__class__,
6420                 decode_path=decode_path,
6421                 offset=offset,
6422             )
6423         if tag_only:  # pragma: no cover
6424             yield None
6425             return
6426         lenindef = False
6427         ctx_bered = ctx.get("bered", False)
6428         try:
6429             l, llen, v = len_decode(lv)
6430         except LenIndefForm as err:
6431             if not ctx_bered:
6432                 raise err.__class__(
6433                     msg=err.msg,
6434                     klass=self.__class__,
6435                     decode_path=decode_path,
6436                     offset=offset,
6437                 )
6438             l, llen, v = 0, 1, lv[1:]
6439             lenindef = True
6440         except DecodeError as err:
6441             raise err.__class__(
6442                 msg=err.msg,
6443                 klass=self.__class__,
6444                 decode_path=decode_path,
6445                 offset=offset,
6446             )
6447         if l > len(v):
6448             raise NotEnoughData(
6449                 "encoded length is longer than data",
6450                 klass=self.__class__,
6451                 decode_path=decode_path,
6452                 offset=offset,
6453             )
6454         if not lenindef:
6455             v, tail = v[:l], v[l:]
6456         vlen = 0
6457         sub_offset = offset + tlen + llen
6458         values = {}
6459         ber_encoded = False
6460         ctx_allow_default_values = ctx.get("allow_default_values", False)
6461         for name, spec in iteritems(self.specs):
6462             if spec.optional and (
6463                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6464                     len(v) == 0
6465             ):
6466                 continue
6467             spec_defaulted = spec.default is not None
6468             sub_decode_path = decode_path + (name,)
6469             try:
6470                 if evgen_mode and not spec_defaulted:
6471                     for _decode_path, value, v_tail in spec.decode_evgen(
6472                             v,
6473                             sub_offset,
6474                             leavemm=True,
6475                             decode_path=sub_decode_path,
6476                             ctx=ctx,
6477                             _ctx_immutable=False,
6478                     ):
6479                         yield _decode_path, value, v_tail
6480                 else:
6481                     _, value, v_tail = next(spec.decode_evgen(
6482                         v,
6483                         sub_offset,
6484                         leavemm=True,
6485                         decode_path=sub_decode_path,
6486                         ctx=ctx,
6487                         _ctx_immutable=False,
6488                         _evgen_mode=False,
6489                     ))
6490             except TagMismatch as err:
6491                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6492                     continue
6493                 raise
6494
6495             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6496             if not evgen_mode and defined is not None:
6497                 defined_by, defined_spec = defined
6498                 if issubclass(value.__class__, SequenceOf):
6499                     for i, _value in enumerate(value):
6500                         sub_sub_decode_path = sub_decode_path + (
6501                             str(i),
6502                             DecodePathDefBy(defined_by),
6503                         )
6504                         defined_value, defined_tail = defined_spec.decode(
6505                             memoryview(bytes(_value)),
6506                             sub_offset + (
6507                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6508                                 if value.expled else (value.tlen + value.llen)
6509                             ),
6510                             leavemm=True,
6511                             decode_path=sub_sub_decode_path,
6512                             ctx=ctx,
6513                             _ctx_immutable=False,
6514                         )
6515                         if len(defined_tail) > 0:
6516                             raise DecodeError(
6517                                 "remaining data",
6518                                 klass=self.__class__,
6519                                 decode_path=sub_sub_decode_path,
6520                                 offset=offset,
6521                             )
6522                         _value.defined = (defined_by, defined_value)
6523                 else:
6524                     defined_value, defined_tail = defined_spec.decode(
6525                         memoryview(bytes(value)),
6526                         sub_offset + (
6527                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6528                             if value.expled else (value.tlen + value.llen)
6529                         ),
6530                         leavemm=True,
6531                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6532                         ctx=ctx,
6533                         _ctx_immutable=False,
6534                     )
6535                     if len(defined_tail) > 0:
6536                         raise DecodeError(
6537                             "remaining data",
6538                             klass=self.__class__,
6539                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6540                             offset=offset,
6541                         )
6542                     value.defined = (defined_by, defined_value)
6543
6544             value_len = value.fulllen
6545             vlen += value_len
6546             sub_offset += value_len
6547             v = v_tail
6548             if spec_defaulted:
6549                 if evgen_mode:
6550                     yield sub_decode_path, value, v_tail
6551                 if value == spec.default:
6552                     if ctx_bered or ctx_allow_default_values:
6553                         ber_encoded = True
6554                     else:
6555                         raise DecodeError(
6556                             "DEFAULT value met",
6557                             klass=self.__class__,
6558                             decode_path=sub_decode_path,
6559                             offset=sub_offset,
6560                         )
6561             if not evgen_mode:
6562                 values[name] = value
6563                 spec_defines = getattr(spec, "defines", ())
6564                 if len(spec_defines) == 0:
6565                     defines_by_path = ctx.get("defines_by_path", ())
6566                     if len(defines_by_path) > 0:
6567                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6568                 if spec_defines is not None and len(spec_defines) > 0:
6569                     for rel_path, schema in spec_defines:
6570                         defined = schema.get(value, None)
6571                         if defined is not None:
6572                             ctx.setdefault("_defines", []).append((
6573                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6574                                 (value, defined),
6575                             ))
6576         if lenindef:
6577             if v[:EOC_LEN].tobytes() != EOC:
6578                 raise DecodeError(
6579                     "no EOC",
6580                     klass=self.__class__,
6581                     decode_path=decode_path,
6582                     offset=offset,
6583                 )
6584             tail = v[EOC_LEN:]
6585             vlen += EOC_LEN
6586         elif len(v) > 0:
6587             raise DecodeError(
6588                 "remaining data",
6589                 klass=self.__class__,
6590                 decode_path=decode_path,
6591                 offset=offset,
6592             )
6593         obj = self.__class__(
6594             schema=self.specs,
6595             impl=self.tag,
6596             expl=self._expl,
6597             default=self.default,
6598             optional=self.optional,
6599             _decoded=(offset, llen, vlen),
6600         )
6601         obj._value = values
6602         obj.lenindef = lenindef
6603         obj.ber_encoded = ber_encoded
6604         yield decode_path, obj, tail
6605
6606     def __repr__(self):
6607         value = pp_console_row(next(self.pps()))
6608         cols = []
6609         for name in self.specs:
6610             _value = self._value.get(name)
6611             if _value is None:
6612                 continue
6613             cols.append("%s: %s" % (name, repr(_value)))
6614         return "%s[%s]" % (value, "; ".join(cols))
6615
6616     def pps(self, decode_path=()):
6617         yield _pp(
6618             obj=self,
6619             asn1_type_name=self.asn1_type_name,
6620             obj_name=self.__class__.__name__,
6621             decode_path=decode_path,
6622             optional=self.optional,
6623             default=self == self.default,
6624             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6625             expl=None if self._expl is None else tag_decode(self._expl),
6626             offset=self.offset,
6627             tlen=self.tlen,
6628             llen=self.llen,
6629             vlen=self.vlen,
6630             expl_offset=self.expl_offset if self.expled else None,
6631             expl_tlen=self.expl_tlen if self.expled else None,
6632             expl_llen=self.expl_llen if self.expled else None,
6633             expl_vlen=self.expl_vlen if self.expled else None,
6634             expl_lenindef=self.expl_lenindef,
6635             lenindef=self.lenindef,
6636             ber_encoded=self.ber_encoded,
6637             bered=self.bered,
6638         )
6639         for name in self.specs:
6640             value = self._value.get(name)
6641             if value is None:
6642                 continue
6643             yield value.pps(decode_path=decode_path + (name,))
6644         for pp in self.pps_lenindef(decode_path):
6645             yield pp
6646
6647
6648 class Set(Sequence, SequenceEncode1stMixing):
6649     """``SET`` structure type
6650
6651     Its usage is identical to :py:class:`pyderasn.Sequence`.
6652
6653     .. _allow_unordered_set_ctx:
6654
6655     DER prohibits unordered values encoding and will raise an error
6656     during decode. If :ref:`bered <bered_ctx>` context option is set,
6657     then no error will occur. Also you can disable strict values
6658     ordering check by setting ``"allow_unordered_set": True``
6659     :ref:`context <ctx>` option.
6660     """
6661     __slots__ = ()
6662     tag_default = tag_encode(form=TagFormConstructed, num=17)
6663     asn1_type_name = "SET"
6664
6665     def _values_for_encoding(self):
6666         return sorted(
6667             super(Set, self)._values_for_encoding(),
6668             key=attrgetter("tag_order"),
6669         )
6670
6671     def _encode_cer(self, writer):
6672         write_full(writer, self.tag + LENINDEF)
6673         for v in sorted(
6674                 super(Set, self)._values_for_encoding(),
6675                 key=attrgetter("tag_order_cer"),
6676         ):
6677             v.encode_cer(writer)
6678         write_full(writer, EOC)
6679
6680     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6681         try:
6682             t, tlen, lv = tag_strip(tlv)
6683         except DecodeError as err:
6684             raise err.__class__(
6685                 msg=err.msg,
6686                 klass=self.__class__,
6687                 decode_path=decode_path,
6688                 offset=offset,
6689             )
6690         if t != self.tag:
6691             raise TagMismatch(
6692                 klass=self.__class__,
6693                 decode_path=decode_path,
6694                 offset=offset,
6695             )
6696         if tag_only:
6697             yield None
6698             return
6699         lenindef = False
6700         ctx_bered = ctx.get("bered", False)
6701         try:
6702             l, llen, v = len_decode(lv)
6703         except LenIndefForm as err:
6704             if not ctx_bered:
6705                 raise err.__class__(
6706                     msg=err.msg,
6707                     klass=self.__class__,
6708                     decode_path=decode_path,
6709                     offset=offset,
6710                 )
6711             l, llen, v = 0, 1, lv[1:]
6712             lenindef = True
6713         except DecodeError as err:
6714             raise err.__class__(
6715                 msg=err.msg,
6716                 klass=self.__class__,
6717                 decode_path=decode_path,
6718                 offset=offset,
6719             )
6720         if l > len(v):
6721             raise NotEnoughData(
6722                 "encoded length is longer than data",
6723                 klass=self.__class__,
6724                 offset=offset,
6725             )
6726         if not lenindef:
6727             v, tail = v[:l], v[l:]
6728         vlen = 0
6729         sub_offset = offset + tlen + llen
6730         values = {}
6731         ber_encoded = False
6732         ctx_allow_default_values = ctx.get("allow_default_values", False)
6733         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6734         tag_order_prev = (0, 0)
6735         _specs_items = copy(self.specs)
6736
6737         while len(v) > 0:
6738             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6739                 break
6740             for name, spec in iteritems(_specs_items):
6741                 sub_decode_path = decode_path + (name,)
6742                 try:
6743                     spec.decode(
6744                         v,
6745                         sub_offset,
6746                         leavemm=True,
6747                         decode_path=sub_decode_path,
6748                         ctx=ctx,
6749                         tag_only=True,
6750                         _ctx_immutable=False,
6751                     )
6752                 except TagMismatch:
6753                     continue
6754                 break
6755             else:
6756                 raise TagMismatch(
6757                     klass=self.__class__,
6758                     decode_path=decode_path,
6759                     offset=offset,
6760                 )
6761             spec_defaulted = spec.default is not None
6762             if evgen_mode and not spec_defaulted:
6763                 for _decode_path, value, v_tail in spec.decode_evgen(
6764                         v,
6765                         sub_offset,
6766                         leavemm=True,
6767                         decode_path=sub_decode_path,
6768                         ctx=ctx,
6769                         _ctx_immutable=False,
6770                 ):
6771                     yield _decode_path, value, v_tail
6772             else:
6773                 _, value, v_tail = next(spec.decode_evgen(
6774                     v,
6775                     sub_offset,
6776                     leavemm=True,
6777                     decode_path=sub_decode_path,
6778                     ctx=ctx,
6779                     _ctx_immutable=False,
6780                     _evgen_mode=False,
6781                 ))
6782             value_tag_order = value.tag_order
6783             value_len = value.fulllen
6784             if tag_order_prev >= value_tag_order:
6785                 if ctx_bered or ctx_allow_unordered_set:
6786                     ber_encoded = True
6787                 else:
6788                     raise DecodeError(
6789                         "unordered " + self.asn1_type_name,
6790                         klass=self.__class__,
6791                         decode_path=sub_decode_path,
6792                         offset=sub_offset,
6793                     )
6794             if spec_defaulted:
6795                 if evgen_mode:
6796                     yield sub_decode_path, value, v_tail
6797                 if value != spec.default:
6798                     pass
6799                 elif ctx_bered or ctx_allow_default_values:
6800                     ber_encoded = True
6801                 else:
6802                     raise DecodeError(
6803                         "DEFAULT value met",
6804                         klass=self.__class__,
6805                         decode_path=sub_decode_path,
6806                         offset=sub_offset,
6807                     )
6808             values[name] = value
6809             del _specs_items[name]
6810             tag_order_prev = value_tag_order
6811             sub_offset += value_len
6812             vlen += value_len
6813             v = v_tail
6814
6815         obj = self.__class__(
6816             schema=self.specs,
6817             impl=self.tag,
6818             expl=self._expl,
6819             default=self.default,
6820             optional=self.optional,
6821             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6822         )
6823         if lenindef:
6824             if v[:EOC_LEN].tobytes() != EOC:
6825                 raise DecodeError(
6826                     "no EOC",
6827                     klass=self.__class__,
6828                     decode_path=decode_path,
6829                     offset=offset,
6830                 )
6831             tail = v[EOC_LEN:]
6832             obj.lenindef = True
6833         for name, spec in iteritems(self.specs):
6834             if name not in values and not spec.optional:
6835                 raise DecodeError(
6836                     "%s value is not ready" % name,
6837                     klass=self.__class__,
6838                     decode_path=decode_path,
6839                     offset=offset,
6840                 )
6841         if not evgen_mode:
6842             obj._value = values
6843         obj.ber_encoded = ber_encoded
6844         yield decode_path, obj, tail
6845
6846
6847 SequenceOfState = namedtuple(
6848     "SequenceOfState",
6849     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6850     **NAMEDTUPLE_KWARGS
6851 )
6852
6853
6854 class SequenceOf(SequenceEncode1stMixing, Obj):
6855     """``SEQUENCE OF`` sequence type
6856
6857     For that kind of type you must specify the object it will carry on
6858     (bounds are for example here, not required)::
6859
6860         class Ints(SequenceOf):
6861             schema = Integer()
6862             bounds = (0, 2)
6863
6864     >>> ints = Ints()
6865     >>> ints.append(Integer(123))
6866     >>> ints.append(Integer(234))
6867     >>> ints
6868     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6869     >>> [int(i) for i in ints]
6870     [123, 234]
6871     >>> ints.append(Integer(345))
6872     Traceback (most recent call last):
6873     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6874     >>> ints[1]
6875     INTEGER 234
6876     >>> ints[1] = Integer(345)
6877     >>> ints
6878     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6879
6880     You can initialize sequence with preinitialized values:
6881
6882     >>> ints = Ints([Integer(123), Integer(234)])
6883
6884     Also you can use iterator as a value:
6885
6886     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6887
6888     And it won't be iterated until encoding process. Pay attention that
6889     bounds and required schema checks are done only during the encoding
6890     process in that case! After encode was called, then value is zeroed
6891     back to empty list and you have to set it again. That mode is useful
6892     mainly with CER encoding mode, where all objects from the iterable
6893     will be streamed to the buffer, without copying all of them to
6894     memory first.
6895     """
6896     __slots__ = ("spec", "_bound_min", "_bound_max")
6897     tag_default = tag_encode(form=TagFormConstructed, num=16)
6898     asn1_type_name = "SEQUENCE OF"
6899
6900     def __init__(
6901             self,
6902             value=None,
6903             schema=None,
6904             bounds=None,
6905             impl=None,
6906             expl=None,
6907             default=None,
6908             optional=False,
6909             _decoded=(0, 0, 0),
6910     ):
6911         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6912         if schema is None:
6913             schema = getattr(self, "schema", None)
6914         if schema is None:
6915             raise ValueError("schema must be specified")
6916         self.spec = schema
6917         self._bound_min, self._bound_max = getattr(
6918             self,
6919             "bounds",
6920             (0, float("+inf")),
6921         ) if bounds is None else bounds
6922         self._value = []
6923         if value is not None:
6924             self._value = self._value_sanitize(value)
6925         if default is not None:
6926             default_value = self._value_sanitize(default)
6927             default_obj = self.__class__(
6928                 schema=schema,
6929                 impl=self.tag,
6930                 expl=self._expl,
6931             )
6932             default_obj._value = default_value
6933             self.default = default_obj
6934             if value is None:
6935                 self._value = copy(default_obj._value)
6936
6937     def _value_sanitize(self, value):
6938         iterator = False
6939         if issubclass(value.__class__, SequenceOf):
6940             value = value._value
6941         elif hasattr(value, NEXT_ATTR_NAME):
6942             iterator = True
6943         elif hasattr(value, "__iter__"):
6944             value = list(value)
6945         else:
6946             raise InvalidValueType((self.__class__, iter, "iterator"))
6947         if not iterator:
6948             if not self._bound_min <= len(value) <= self._bound_max:
6949                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6950             class_expected = self.spec.__class__
6951             for v in value:
6952                 if not isinstance(v, class_expected):
6953                     raise InvalidValueType((class_expected,))
6954         return value
6955
6956     @property
6957     def ready(self):
6958         if hasattr(self._value, NEXT_ATTR_NAME):
6959             return True
6960         if self._bound_min > 0 and len(self._value) == 0:
6961             return False
6962         return all(v.ready for v in self._value)
6963
6964     @property
6965     def bered(self):
6966         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6967             return True
6968         return any(v.bered for v in self._value)
6969
6970     def __getstate__(self):
6971         if hasattr(self._value, NEXT_ATTR_NAME):
6972             raise ValueError("can not pickle SequenceOf with iterator")
6973         return SequenceOfState(
6974             __version__,
6975             self.tag,
6976             self._tag_order,
6977             self._expl,
6978             self.default,
6979             self.optional,
6980             self.offset,
6981             self.llen,
6982             self.vlen,
6983             self.expl_lenindef,
6984             self.lenindef,
6985             self.ber_encoded,
6986             self.spec,
6987             [copy(v) for v in self._value],
6988             self._bound_min,
6989             self._bound_max,
6990         )
6991
6992     def __setstate__(self, state):
6993         super(SequenceOf, self).__setstate__(state)
6994         self.spec = state.spec
6995         self._value = state.value
6996         self._bound_min = state.bound_min
6997         self._bound_max = state.bound_max
6998
6999     def __eq__(self, their):
7000         if isinstance(their, self.__class__):
7001             return (
7002                 self.spec == their.spec and
7003                 self.tag == their.tag and
7004                 self._expl == their._expl and
7005                 self._value == their._value
7006             )
7007         if hasattr(their, "__iter__"):
7008             return self._value == list(their)
7009         return False
7010
7011     def __call__(
7012             self,
7013             value=None,
7014             bounds=None,
7015             impl=None,
7016             expl=None,
7017             default=None,
7018             optional=None,
7019     ):
7020         return self.__class__(
7021             value=value,
7022             schema=self.spec,
7023             bounds=(
7024                 (self._bound_min, self._bound_max)
7025                 if bounds is None else bounds
7026             ),
7027             impl=self.tag if impl is None else impl,
7028             expl=self._expl if expl is None else expl,
7029             default=self.default if default is None else default,
7030             optional=self.optional if optional is None else optional,
7031         )
7032
7033     def __contains__(self, key):
7034         return key in self._value
7035
7036     def append(self, value):
7037         if not isinstance(value, self.spec.__class__):
7038             raise InvalidValueType((self.spec.__class__,))
7039         if len(self._value) + 1 > self._bound_max:
7040             raise BoundsError(
7041                 self._bound_min,
7042                 len(self._value) + 1,
7043                 self._bound_max,
7044             )
7045         self._value.append(value)
7046
7047     def __iter__(self):
7048         return iter(self._value)
7049
7050     def __len__(self):
7051         return len(self._value)
7052
7053     def __setitem__(self, key, value):
7054         if not isinstance(value, self.spec.__class__):
7055             raise InvalidValueType((self.spec.__class__,))
7056         self._value[key] = self.spec(value=value)
7057
7058     def __getitem__(self, key):
7059         return self._value[key]
7060
7061     def _values_for_encoding(self):
7062         return iter(self._value)
7063
7064     def _encode(self):
7065         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7066         if iterator:
7067             values = []
7068             values_append = values.append
7069             class_expected = self.spec.__class__
7070             values_for_encoding = self._values_for_encoding()
7071             self._value = []
7072             for v in values_for_encoding:
7073                 if not isinstance(v, class_expected):
7074                     raise InvalidValueType((class_expected,))
7075                 values_append(v.encode())
7076             if not self._bound_min <= len(values) <= self._bound_max:
7077                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7078             value = b"".join(values)
7079         else:
7080             value = b"".join(v.encode() for v in self._values_for_encoding())
7081         return b"".join((self.tag, len_encode(len(value)), value))
7082
7083     def _encode1st(self, state):
7084         state = super(SequenceOf, self)._encode1st(state)
7085         if hasattr(self._value, NEXT_ATTR_NAME):
7086             self._value = []
7087         return state
7088
7089     def _encode2nd(self, writer, state_iter):
7090         write_full(writer, self.tag + len_encode(next(state_iter)))
7091         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7092         if iterator:
7093             values_count = 0
7094             class_expected = self.spec.__class__
7095             values_for_encoding = self._values_for_encoding()
7096             self._value = []
7097             for v in values_for_encoding:
7098                 if not isinstance(v, class_expected):
7099                     raise InvalidValueType((class_expected,))
7100                 v.encode2nd(writer, state_iter)
7101                 values_count += 1
7102             if not self._bound_min <= values_count <= self._bound_max:
7103                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7104         else:
7105             for v in self._values_for_encoding():
7106                 v.encode2nd(writer, state_iter)
7107
7108     def _encode_cer(self, writer):
7109         write_full(writer, self.tag + LENINDEF)
7110         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7111         if iterator:
7112             class_expected = self.spec.__class__
7113             values_count = 0
7114             values_for_encoding = self._values_for_encoding()
7115             self._value = []
7116             for v in values_for_encoding:
7117                 if not isinstance(v, class_expected):
7118                     raise InvalidValueType((class_expected,))
7119                 v.encode_cer(writer)
7120                 values_count += 1
7121             if not self._bound_min <= values_count <= self._bound_max:
7122                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7123         else:
7124             for v in self._values_for_encoding():
7125                 v.encode_cer(writer)
7126         write_full(writer, EOC)
7127
7128     def _decode(
7129             self,
7130             tlv,
7131             offset,
7132             decode_path,
7133             ctx,
7134             tag_only,
7135             evgen_mode,
7136             ordering_check=False,
7137     ):
7138         try:
7139             t, tlen, lv = tag_strip(tlv)
7140         except DecodeError as err:
7141             raise err.__class__(
7142                 msg=err.msg,
7143                 klass=self.__class__,
7144                 decode_path=decode_path,
7145                 offset=offset,
7146             )
7147         if t != self.tag:
7148             raise TagMismatch(
7149                 klass=self.__class__,
7150                 decode_path=decode_path,
7151                 offset=offset,
7152             )
7153         if tag_only:
7154             yield None
7155             return
7156         lenindef = False
7157         ctx_bered = ctx.get("bered", False)
7158         try:
7159             l, llen, v = len_decode(lv)
7160         except LenIndefForm as err:
7161             if not ctx_bered:
7162                 raise err.__class__(
7163                     msg=err.msg,
7164                     klass=self.__class__,
7165                     decode_path=decode_path,
7166                     offset=offset,
7167                 )
7168             l, llen, v = 0, 1, lv[1:]
7169             lenindef = True
7170         except DecodeError as err:
7171             raise err.__class__(
7172                 msg=err.msg,
7173                 klass=self.__class__,
7174                 decode_path=decode_path,
7175                 offset=offset,
7176             )
7177         if l > len(v):
7178             raise NotEnoughData(
7179                 "encoded length is longer than data",
7180                 klass=self.__class__,
7181                 decode_path=decode_path,
7182                 offset=offset,
7183             )
7184         if not lenindef:
7185             v, tail = v[:l], v[l:]
7186         vlen = 0
7187         sub_offset = offset + tlen + llen
7188         _value = []
7189         _value_count = 0
7190         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7191         value_prev = memoryview(v[:0])
7192         ber_encoded = False
7193         spec = self.spec
7194         while len(v) > 0:
7195             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7196                 break
7197             sub_decode_path = decode_path + (str(_value_count),)
7198             if evgen_mode:
7199                 for _decode_path, value, v_tail in spec.decode_evgen(
7200                         v,
7201                         sub_offset,
7202                         leavemm=True,
7203                         decode_path=sub_decode_path,
7204                         ctx=ctx,
7205                         _ctx_immutable=False,
7206                 ):
7207                     yield _decode_path, value, v_tail
7208             else:
7209                 _, value, v_tail = next(spec.decode_evgen(
7210                     v,
7211                     sub_offset,
7212                     leavemm=True,
7213                     decode_path=sub_decode_path,
7214                     ctx=ctx,
7215                     _ctx_immutable=False,
7216                     _evgen_mode=False,
7217                 ))
7218             value_len = value.fulllen
7219             if ordering_check:
7220                 if value_prev.tobytes() > v[:value_len].tobytes():
7221                     if ctx_bered or ctx_allow_unordered_set:
7222                         ber_encoded = True
7223                     else:
7224                         raise DecodeError(
7225                             "unordered " + self.asn1_type_name,
7226                             klass=self.__class__,
7227                             decode_path=sub_decode_path,
7228                             offset=sub_offset,
7229                         )
7230                 value_prev = v[:value_len]
7231             _value_count += 1
7232             if not evgen_mode:
7233                 _value.append(value)
7234             sub_offset += value_len
7235             vlen += value_len
7236             v = v_tail
7237         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7238             raise DecodeError(
7239                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7240                 klass=self.__class__,
7241                 decode_path=decode_path,
7242                 offset=offset,
7243             )
7244         try:
7245             obj = self.__class__(
7246                 value=None if evgen_mode else _value,
7247                 schema=spec,
7248                 bounds=(self._bound_min, self._bound_max),
7249                 impl=self.tag,
7250                 expl=self._expl,
7251                 default=self.default,
7252                 optional=self.optional,
7253                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7254             )
7255         except BoundsError as err:
7256             raise DecodeError(
7257                 msg=str(err),
7258                 klass=self.__class__,
7259                 decode_path=decode_path,
7260                 offset=offset,
7261             )
7262         if lenindef:
7263             if v[:EOC_LEN].tobytes() != EOC:
7264                 raise DecodeError(
7265                     "no EOC",
7266                     klass=self.__class__,
7267                     decode_path=decode_path,
7268                     offset=offset,
7269                 )
7270             obj.lenindef = True
7271             tail = v[EOC_LEN:]
7272         obj.ber_encoded = ber_encoded
7273         yield decode_path, obj, tail
7274
7275     def __repr__(self):
7276         return "%s[%s]" % (
7277             pp_console_row(next(self.pps())),
7278             ", ".join(repr(v) for v in self._value),
7279         )
7280
7281     def pps(self, decode_path=()):
7282         yield _pp(
7283             obj=self,
7284             asn1_type_name=self.asn1_type_name,
7285             obj_name=self.__class__.__name__,
7286             decode_path=decode_path,
7287             optional=self.optional,
7288             default=self == self.default,
7289             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7290             expl=None if self._expl is None else tag_decode(self._expl),
7291             offset=self.offset,
7292             tlen=self.tlen,
7293             llen=self.llen,
7294             vlen=self.vlen,
7295             expl_offset=self.expl_offset if self.expled else None,
7296             expl_tlen=self.expl_tlen if self.expled else None,
7297             expl_llen=self.expl_llen if self.expled else None,
7298             expl_vlen=self.expl_vlen if self.expled else None,
7299             expl_lenindef=self.expl_lenindef,
7300             lenindef=self.lenindef,
7301             ber_encoded=self.ber_encoded,
7302             bered=self.bered,
7303         )
7304         for i, value in enumerate(self._value):
7305             yield value.pps(decode_path=decode_path + (str(i),))
7306         for pp in self.pps_lenindef(decode_path):
7307             yield pp
7308
7309
7310 class SetOf(SequenceOf):
7311     """``SET OF`` sequence type
7312
7313     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7314     """
7315     __slots__ = ()
7316     tag_default = tag_encode(form=TagFormConstructed, num=17)
7317     asn1_type_name = "SET OF"
7318
7319     def _value_sanitize(self, value):
7320         value = super(SetOf, self)._value_sanitize(value)
7321         if hasattr(value, NEXT_ATTR_NAME):
7322             raise ValueError(
7323                 "SetOf does not support iterator values, as no sense in them"
7324             )
7325         return value
7326
7327     def _encode(self):
7328         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7329         return b"".join((self.tag, len_encode(len(v)), v))
7330
7331     def _encode2nd(self, writer, state_iter):
7332         write_full(writer, self.tag + len_encode(next(state_iter)))
7333         values = []
7334         for v in self._values_for_encoding():
7335             buf = BytesIO()
7336             v.encode2nd(buf.write, state_iter)
7337             values.append(buf.getvalue())
7338         values.sort()
7339         for v in values:
7340             write_full(writer, v)
7341
7342     def _encode_cer(self, writer):
7343         write_full(writer, self.tag + LENINDEF)
7344         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7345             write_full(writer, v)
7346         write_full(writer, EOC)
7347
7348     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7349         return super(SetOf, self)._decode(
7350             tlv,
7351             offset,
7352             decode_path,
7353             ctx,
7354             tag_only,
7355             evgen_mode,
7356             ordering_check=True,
7357         )
7358
7359
7360 def obj_by_path(pypath):  # pragma: no cover
7361     """Import object specified as string Python path
7362
7363     Modules must be separated from classes/functions with ``:``.
7364
7365     >>> obj_by_path("foo.bar:Baz")
7366     <class 'foo.bar.Baz'>
7367     >>> obj_by_path("foo.bar:Baz.boo")
7368     <classmethod 'foo.bar.Baz.boo'>
7369     """
7370     mod, objs = pypath.rsplit(":", 1)
7371     from importlib import import_module
7372     obj = import_module(mod)
7373     for obj_name in objs.split("."):
7374         obj = getattr(obj, obj_name)
7375     return obj
7376
7377
7378 def generic_decoder():  # pragma: no cover
7379     # All of this below is a big hack with self references
7380     choice = PrimitiveTypes()
7381     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7382     choice.specs["SetOf"] = SetOf(schema=choice)
7383     for i in six_xrange(31):
7384         choice.specs["SequenceOf%d" % i] = SequenceOf(
7385             schema=choice,
7386             expl=tag_ctxc(i),
7387         )
7388     choice.specs["Any"] = Any()
7389
7390     # Class name equals to type name, to omit it from output
7391     class SEQUENCEOF(SequenceOf):
7392         __slots__ = ()
7393         schema = choice
7394
7395     def pprint_any(
7396             obj,
7397             oid_maps=(),
7398             with_colours=False,
7399             with_decode_path=False,
7400             decode_path_only=(),
7401             decode_path=(),
7402     ):
7403         def _pprint_pps(pps):
7404             for pp in pps:
7405                 if hasattr(pp, "_fields"):
7406                     if (
7407                             decode_path_only != () and
7408                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7409                     ):
7410                         continue
7411                     if pp.asn1_type_name == Choice.asn1_type_name:
7412                         continue
7413                     pp_kwargs = pp._asdict()
7414                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7415                     pp = _pp(**pp_kwargs)
7416                     yield pp_console_row(
7417                         pp,
7418                         oid_maps=oid_maps,
7419                         with_offsets=True,
7420                         with_blob=False,
7421                         with_colours=with_colours,
7422                         with_decode_path=with_decode_path,
7423                         decode_path_len_decrease=len(decode_path_only),
7424                     )
7425                     for row in pp_console_blob(
7426                             pp,
7427                             decode_path_len_decrease=len(decode_path_only),
7428                     ):
7429                         yield row
7430                 else:
7431                     for row in _pprint_pps(pp):
7432                         yield row
7433         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7434     return SEQUENCEOF(), pprint_any
7435
7436
7437 def ascii_visualize(ba):
7438     """Output only ASCII printable characters, like in hexdump -C
7439
7440     Example output for given binary string (right part)::
7441
7442         92 2b 39 20 65 91 e6 8e  95 93 1a 58 df 02 78 ea  |.+9 e......X..x.|
7443                                                            ^^^^^^^^^^^^^^^^
7444     """
7445     return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7446
7447
7448 def hexdump(raw):
7449     """Generate ``hexdump -C`` like output
7450
7451     Rendered example::
7452
7453         00000000  30 80 30 80 a0 80 02 01  02 00 00 02 14 54 a5 18  |0.0..........T..|
7454         00000010  69 ef 8b 3f 15 fd ea ad  bd 47 e0 94 81 6b 06 6a  |i..?.....G...k.j|
7455
7456     Result of that function is a generator of lines, where each line is
7457     a list of columns::
7458
7459         [
7460             [...],
7461             ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7462                           " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7463                           " |i..?.....G...k.j|"]
7464             [...],
7465         ]
7466     """
7467     hexed = hexenc(raw).upper()
7468     addr, cols = 0, ["%08x " % 0]
7469     for i in six_xrange(0, len(hexed), 2):
7470         if i != 0 and i // 2 % 8 == 0:
7471             cols[-1] += " "
7472         if i != 0 and i // 2 % 16 == 0:
7473             cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7474             yield cols
7475             addr += 16
7476             cols = ["%08x " % addr]
7477         cols.append(" " + hexed[i:i + 2])
7478     if len(cols) > 0:
7479         cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7480         yield cols
7481
7482
7483 def browse(raw, obj, oid_maps=()):
7484     """Interactive browser
7485
7486     :param bytes raw: binary data you decoded
7487     :param obj: decoded :py:class:`pyderasn.Obj`
7488     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7489                      Its human readable form is printed when OID is met
7490
7491     .. note:: `urwid <http://urwid.org/>`__ dependency required
7492
7493     This browser is an interactive terminal application for browsing
7494     structures of your decoded ASN.1 objects. You can quit it with **q**
7495     key. It consists of three windows:
7496
7497     :tree:
7498      View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7499      **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7500      **Left** key goes to constructed element above. **Plus**/**Minus**
7501      keys collapse/uncollapse constructed elements. **Space** toggles it
7502     :info:
7503      window with various information about element. You can scroll it
7504      with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7505     :hexdump:
7506      window with raw data hexdump and highlighted current element's
7507      contents. It automatically focuses on element's data. You can
7508      scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7509      speed) keys. If element has explicit tag, then it also will be
7510      highlighted with different colour
7511
7512     Window's header contains current decode path and progress bars with
7513     position in *info* and *hexdump* windows.
7514
7515     If you press **d**, then current element will be saved in the
7516     current directory under its decode path name (adding ".0", ".1", etc
7517     suffix if such file already exists). **D** will save it with explicit tag.
7518
7519     You can also invoke it with ``--browse`` command line argument.
7520     """
7521     from copy import deepcopy
7522     from os.path import exists as path_exists
7523     import urwid
7524
7525     class TW(urwid.TreeWidget):
7526         def __init__(self, state, *args, **kwargs):
7527             self.state = state
7528             self.scrolled = {"info": False, "hexdump": False}
7529             super(TW, self).__init__(*args, **kwargs)
7530
7531         def _get_pp(self):
7532             pp = self.get_node().get_value()
7533             constructed = len(pp) > 1
7534             return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7535
7536         def _state_update(self):
7537             pp, _ = self._get_pp()
7538             self.state["decode_path"].set_text(
7539                 ":".join(str(p) for p in pp.decode_path)
7540             )
7541             lines = deepcopy(self.state["hexed"])
7542
7543             def attr_set(i, attr):
7544                 line = lines[i // 16]
7545                 idx = 1 + (i - 16 * (i // 16))
7546                 line[idx] = (attr, line[idx])
7547
7548             if pp.expl_offset is not None:
7549                 for i in six_xrange(
7550                         pp.expl_offset,
7551                         pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7552                 ):
7553                     attr_set(i, "select-expl")
7554             for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7555                 attr_set(i, "select-value")
7556             self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7557             self.state["hexdump"].set_focus(pp.offset // 16)
7558             self.state["hexdump"].set_focus_valign("middle")
7559             self.state["hexdump_bar"].set_completion(
7560                 (100 * pp.offset // 16) //
7561                 len(self.state["hexdump"]._body.positions())
7562             )
7563
7564             lines = [
7565                 [("header", "Name: "), pp.obj_name],
7566                 [("header", "Type: "), pp.asn1_type_name],
7567                 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7568                 [("header", "[TLV]len: "), "%d/%d/%d" % (
7569                     pp.tlen, pp.llen, pp.vlen,
7570                 )],
7571                 [("header", "TLVlen: "), "%d" % sum((
7572                     pp.tlen, pp.llen, pp.vlen,
7573                 ))],
7574                 [("header", "Slice: "), "[%d:%d]" % (
7575                     pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7576                 )],
7577             ]
7578             if pp.lenindef:
7579                 lines.append([("warning", "LENINDEF")])
7580             if pp.ber_encoded:
7581                 lines.append([("warning", "BER encoded")])
7582             if pp.bered:
7583                 lines.append([("warning", "BERed")])
7584             if pp.expl is not None:
7585                 lines.append([("header", "EXPLICIT")])
7586                 klass, _, num = pp.expl
7587                 lines.append(["  Tag: %s%d" % (TagClassReprs[klass], num)])
7588                 if pp.expl_offset is not None:
7589                     lines.append(["  Offset: %d" % pp.expl_offset])
7590                     lines.append(["  [TLV]len: %d/%d/%d" % (
7591                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7592                     )])
7593                     lines.append(["  TLVlen: %d" % sum((
7594                         pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7595                     ))])
7596                     lines.append(["  Slice: [%d:%d]" % (
7597                         pp.expl_offset,
7598                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7599                     )])
7600             if pp.impl is not None:
7601                 klass, _, num = pp.impl
7602                 lines.append([
7603                     ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7604                 ])
7605             if pp.optional:
7606                 lines.append(["OPTIONAL"])
7607             if pp.default:
7608                 lines.append(["DEFAULT"])
7609             if len(pp.decode_path) > 0:
7610                 ent = pp.decode_path[-1]
7611                 if isinstance(ent, DecodePathDefBy):
7612                     lines.append([""])
7613                     value = str(ent.defined_by)
7614                     oid_name = find_oid_name(
7615                         ent.defined_by.asn1_type_name, oid_maps, value,
7616                     )
7617                     lines.append([("header", "DEFINED BY: "), "%s" % (
7618                         value if oid_name is None
7619                         else "%s (%s)" % (oid_name, value)
7620                     )])
7621             lines.append([""])
7622             if pp.value is not None:
7623                 lines.append([("header", "Value: "), pp.value])
7624                 if (
7625                         len(oid_maps) > 0 and
7626                         pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7627                 ):
7628                     for oid_map in oid_maps:
7629                         oid_name = oid_map.get(pp.value)
7630                         if oid_name is not None:
7631                             lines.append([("header", "Human: "), oid_name])
7632                             break
7633                 if pp.asn1_type_name == Integer.asn1_type_name:
7634                     lines.append([
7635                         ("header", "Decimal: "), "%d" % int(pp.obj),
7636                     ])
7637                     lines.append([
7638                         ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7639                     ])
7640             if pp.blob.__class__ == binary_type:
7641                 blob = hexenc(pp.blob).upper()
7642                 for i in six_xrange(0, len(blob), 32):
7643                     lines.append([colonize_hex(blob[i:i + 32])])
7644             elif pp.blob.__class__ == tuple:
7645                 lines.append([", ".join(pp.blob)])
7646             self.state["info"]._set_body([urwid.Text(line) for line in lines])
7647             self.state["info_bar"].set_completion(0)
7648
7649         def selectable(self):
7650             if self.state["widget_current"] != self:
7651                 self.state["widget_current"] = self
7652                 self.scrolled["info"] = False
7653                 self.scrolled["hexdump"] = False
7654                 self._state_update()
7655             return super(TW, self).selectable()
7656
7657         def get_display_text(self):
7658             pp, constructed = self._get_pp()
7659             style = "constructed" if constructed else ""
7660             if len(pp.decode_path) == 0:
7661                 return (style, pp.obj_name)
7662             if pp.asn1_type_name == "EOC":
7663                 return ("eoc", "EOC")
7664             ent = pp.decode_path[-1]
7665             if isinstance(ent, DecodePathDefBy):
7666                 value = str(ent.defined_by)
7667                 oid_name = find_oid_name(
7668                     ent.defined_by.asn1_type_name, oid_maps, value,
7669                 )
7670                 return ("defby", "DEFBY:" + (
7671                     value if oid_name is None else oid_name
7672                 ))
7673             return (style, ent)
7674
7675         def _scroll(self, what, step):
7676             self.state[what]._invalidate()
7677             pos = self.state[what].focus_position
7678             if not self.scrolled[what]:
7679                 self.scrolled[what] = True
7680                 pos -= 2
7681             pos = max(0, pos + step)
7682             pos = min(pos, len(self.state[what]._body.positions()) - 1)
7683             self.state[what].set_focus(pos)
7684             self.state[what].set_focus_valign("top")
7685             self.state[what + "_bar"].set_completion(
7686                 (100 * pos) // len(self.state[what]._body.positions())
7687             )
7688
7689         def keypress(self, size, key):
7690             if key == "q":
7691                 raise urwid.ExitMainLoop()
7692
7693             if key == " ":
7694                 self.expanded = not self.expanded
7695                 self.update_expanded_icon()
7696                 return None
7697
7698             hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7699             if key in hexdump_steps:
7700                 self._scroll("hexdump", hexdump_steps[key])
7701                 return None
7702
7703             info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7704             if key in info_steps:
7705                 self._scroll("info", info_steps[key])
7706                 return None
7707
7708             if key in ("d", "D"):
7709                 pp, _ = self._get_pp()
7710                 dp = ":".join(str(p) for p in pp.decode_path)
7711                 dp = dp.replace(" ", "_")
7712                 if dp == "":
7713                     dp = "root"
7714                 if key == "d" or pp.expl_offset is None:
7715                     data = self.state["raw"][pp.offset:(
7716                         pp.offset + pp.tlen + pp.llen + pp.vlen
7717                     )]
7718                 else:
7719                     data = self.state["raw"][pp.expl_offset:(
7720                         pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7721                     )]
7722                 ctr = 0
7723
7724                 def duplicate_path(dp, ctr):
7725                     if ctr == 0:
7726                         return dp
7727                     return "%s.%d" % (dp, ctr)
7728
7729                 while True:
7730                     if not path_exists(duplicate_path(dp, ctr)):
7731                         break
7732                     ctr += 1
7733                 dp = duplicate_path(dp, ctr)
7734                 with open(dp, "wb") as fd:
7735                     fd.write(data)
7736                 self.state["decode_path"].set_text(
7737                     ("warning", "Saved to: " + dp)
7738                 )
7739                 return None
7740             return super(TW, self).keypress(size, key)
7741
7742     class PN(urwid.ParentNode):
7743         def __init__(self, state, value, *args, **kwargs):
7744             self.state = state
7745             if not hasattr(value, "_fields"):
7746                 value = list(value)
7747             super(PN, self).__init__(value, *args, **kwargs)
7748
7749         def load_widget(self):
7750             return TW(self.state, self)
7751
7752         def load_child_keys(self):
7753             value = self.get_value()
7754             if hasattr(value, "_fields"):
7755                 return []
7756             return range(len(value[1:]))
7757
7758         def load_child_node(self, key):
7759             return PN(
7760                 self.state,
7761                 self.get_value()[key + 1],
7762                 parent=self,
7763                 key=key,
7764                 depth=self.get_depth() + 1,
7765             )
7766
7767     class LabeledPG(urwid.ProgressBar):
7768         def __init__(self, label, *args, **kwargs):
7769             self.label = label
7770             super(LabeledPG, self).__init__(*args, **kwargs)
7771
7772         def get_text(self):
7773             return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7774
7775     WinHexdump = urwid.ListBox([urwid.Text("")])
7776     WinInfo = urwid.ListBox([urwid.Text("")])
7777     WinDecodePath = urwid.Text("", "center")
7778     WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7779     WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7780     WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7781         {
7782             "raw": raw,
7783             "hexed": list(hexdump(raw)),
7784             "widget_current": None,
7785             "info": WinInfo,
7786             "info_bar": WinInfoBar,
7787             "hexdump": WinHexdump,
7788             "hexdump_bar": WinHexdumpBar,
7789             "decode_path": WinDecodePath,
7790         },
7791         list(obj.pps()),
7792     )))
7793     help_text = " ".join((
7794         "q:quit",
7795         "space:(un)collapse",
7796         "(pg)up/down/home/end:nav",
7797         "jkJK:hexdump hlHL:info",
7798         "dD:dump",
7799     ))
7800     urwid.MainLoop(
7801         urwid.Frame(
7802             urwid.Columns([
7803                 ("weight", 1, WinTree),
7804                 ("weight", 2, urwid.Pile([
7805                     urwid.LineBox(WinInfo),
7806                     urwid.LineBox(WinHexdump),
7807                 ])),
7808             ]),
7809             header=urwid.Columns([
7810                 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7811                 ("weight", 1, WinInfoBar),
7812                 ("weight", 1, WinHexdumpBar),
7813             ]),
7814             footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7815         ),
7816         [
7817             ("header", "bold", ""),
7818             ("constructed", "bold", ""),
7819             ("help", "light magenta", ""),
7820             ("warning", "light red", ""),
7821             ("defby", "light red", ""),
7822             ("eoc", "dark red", ""),
7823             ("select-value", "light green", ""),
7824             ("select-expl", "light red", ""),
7825             ("pg-normal", "", "light blue"),
7826             ("pg-complete", "black", "yellow"),
7827         ],
7828     ).run()
7829
7830
7831 def main():  # pragma: no cover
7832     import argparse
7833     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7834     parser.add_argument(
7835         "--skip",
7836         type=int,
7837         default=0,
7838         help="Skip that number of bytes from the beginning",
7839     )
7840     parser.add_argument(
7841         "--oids",
7842         help="Python paths to dictionary with OIDs, comma separated",
7843     )
7844     parser.add_argument(
7845         "--schema",
7846         help="Python path to schema definition to use",
7847     )
7848     parser.add_argument(
7849         "--defines-by-path",
7850         help="Python path to decoder's defines_by_path",
7851     )
7852     parser.add_argument(
7853         "--nobered",
7854         action="store_true",
7855         help="Disallow BER encoding",
7856     )
7857     parser.add_argument(
7858         "--print-decode-path",
7859         action="store_true",
7860         help="Print decode paths",
7861     )
7862     parser.add_argument(
7863         "--decode-path-only",
7864         help="Print only specified decode path",
7865     )
7866     parser.add_argument(
7867         "--allow-expl-oob",
7868         action="store_true",
7869         help="Allow explicit tag out-of-bound",
7870     )
7871     parser.add_argument(
7872         "--evgen",
7873         action="store_true",
7874         help="Turn on event generation mode",
7875     )
7876     parser.add_argument(
7877         "--browse",
7878         action="store_true",
7879         help="Start ASN.1 browser",
7880     )
7881     parser.add_argument(
7882         "RAWFile",
7883         type=argparse.FileType("rb"),
7884         help="Path to BER/CER/DER file you want to decode",
7885     )
7886     args = parser.parse_args()
7887     if PY2:
7888         args.RAWFile.seek(args.skip)
7889         raw = memoryview(args.RAWFile.read())
7890         args.RAWFile.close()
7891     else:
7892         raw = file_mmaped(args.RAWFile)[args.skip:]
7893     oid_maps = (
7894         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7895         if args.oids else ()
7896     )
7897     from functools import partial
7898     if args.schema:
7899         schema = obj_by_path(args.schema)
7900         pprinter = partial(pprint, big_blobs=True)
7901     else:
7902         schema, pprinter = generic_decoder()
7903     ctx = {
7904         "bered": not args.nobered,
7905         "allow_expl_oob": args.allow_expl_oob,
7906     }
7907     if args.defines_by_path is not None:
7908         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7909     if args.browse:
7910         obj, _ = schema().decode(raw, ctx=ctx)
7911         browse(raw, obj, oid_maps)
7912         from sys import exit as sys_exit
7913         sys_exit(0)
7914     from os import environ
7915     pprinter = partial(
7916         pprinter,
7917         oid_maps=oid_maps,
7918         with_colours=environ.get("NO_COLOR") is None,
7919         with_decode_path=args.print_decode_path,
7920         decode_path_only=(
7921             () if args.decode_path_only is None else
7922             tuple(args.decode_path_only.split(":"))
7923         ),
7924     )
7925     if args.evgen:
7926         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7927             print(pprinter(obj, decode_path=decode_path))
7928     else:
7929         obj, tail = schema().decode(raw, ctx=ctx)
7930         print(pprinter(obj))
7931     if tail != b"":
7932         print("\nTrailing data: %s" % hexenc(tail))
7933
7934
7935 if __name__ == "__main__":
7936     from pyderasn import *
7937     main()