]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Remove forgotten unreachable code
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
21
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
24
25     >>> i = Integer(123)
26     >>> raw = i.encode()
27     >>> Integer().decod(raw) == i
28     True
29
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
62
63 Common for most types
64 ---------------------
65
66 Tags
67 ____
68
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
75 tags simultaneously.
76
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
80 number.
81
82 .. note::
83
84    EXPLICIT tags always have **constructed** tag. PyDERASN does not
85    explicitly check correctness of schema input here.
86
87 .. note::
88
89    Implicit tags have **primitive** (``tag_ctxp``) encoding for
90    primitive values.
91
92 ::
93
94     >>> Integer(impl=tag_ctxp(1))
95     [1] INTEGER
96     >>> Integer(expl=tag_ctxc(2))
97     [2] EXPLICIT INTEGER
98
99 Implicit tag is not explicitly shown.
100
101 Two objects of the same type, but with different implicit/explicit tags
102 are **not** equal.
103
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 function::
107
108     >>> tag_decode(tag_ctxc(123))
109     (128, 32, 123)
110     >>> klass, form, num = tag_decode(tag_ctxc(123))
111     >>> klass == TagClassContext
112     True
113     >>> form == TagFormConstructed
114     True
115
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
118
119 Default/optional
120 ________________
121
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
125
126     >>> Integer(optional=True, default=123)
127     INTEGER 123 OPTIONAL DEFAULT
128
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 ``version`` field::
133
134     class Version(Integer):
135         schema = (
136             ("v1", 0),
137             ("v2", 1),
138             ("v3", 2),
139         )
140     class TBSCertificate(Sequence):
141         schema = (
142             ("version", Version(expl=tag_ctxc(0), default="v1")),
143         [...]
144
145 When default argument is used and value is not specified, then it equals
146 to default one.
147
148 .. _bounds:
149
150 Size constraints
151 ________________
152
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
156
157     class X(...):
158         bounds = (MIN, MAX)
159
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
161
162 For simplicity you can also set bounds the following way::
163
164     bounded_x = X(bounds=(MIN, MAX))
165
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
167 raised.
168
169 Common methods
170 ______________
171
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
175
176 All objects are friendly to ``copy.copy()`` and copied objects can be
177 safely mutated.
178
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
181
182 .. _decoding:
183
184 Decoding
185 --------
186
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
193
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
196
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
199
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
205
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
208
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_tlen``,
212 * ``expl_llen``
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
215   ``offset`` otherwise
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217   ``tlvlen`` otherwise
218
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
220
221 .. _ctx:
222
223 Context
224 _______
225
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
229
230 Currently available context options:
231
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238
239 .. _pprinting:
240
241 Pretty printing
242 ---------------
243
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
248
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
252
253     >>> from pyderasn import pprint
254     >>> encoded = Integer(-12345).encode()
255     >>> obj, tail = Integer().decode(encoded)
256     >>> print(pprint(obj))
257         0   [1,1,   2] INTEGER -12345
258
259 .. _pprint_example:
260
261 Example certificate::
262
263     >>> print(pprint(crt))
264         0   [1,3,1604] Certificate SEQUENCE
265         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
266        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
268        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
269        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
271                         . . . . 05:00
272        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
273        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
274        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
275        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
276        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
278                         . . . . . . . 13:02:45:53
279     [...]
280      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
283                         . . . 05:00
284      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
285                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
287      [...]
288
289     Trailing data: 0a
290
291 Let's parse that output, human::
292
293        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
295        0  1  2 3    4     5   6        7            8       9       10 11
296
297 ::
298
299        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
300        ^     ^ ^    ^     ^     ^          ^                 ^
301        0     2 3    4     5     6          9                 10
302
303 ::
304
305        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
306        ^     ^ ^    ^     ^   ^       ^    ^      ^
307        0     2 3    4     5   6       8    9      10
308
309 ::
310
311        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
312              ^           ^                                 ^   ^            ^
313             12          13                                14   9            10
314
315 :0:
316  Offset of the object, where its DER/BER encoding begins.
317  Pay attention that it does **not** include explicit tag.
318 :1:
319  If explicit tag exists, then this is its length (tag + encoded length).
320 :2:
321  Length of object's tag. For example CHOICE does not have its own tag,
322  so it is zero.
323 :3:
324  Length of encoded length.
325 :4:
326  Length of encoded value.
327 :5:
328  Visual indentation to show the depth of object in the hierarchy.
329 :6:
330  Object's name inside SEQUENCE/CHOICE.
331 :7:
332  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333  here. "IMPLICIT" is omitted.
334 :8:
335  Object's class name, if set. Omitted if it is just an ordinary simple
336  value (like with ``algorithm`` in example above).
337 :9:
338  Object's ASN.1 type.
339 :10:
340  Object's value, if set. Can consist of multiple words (like OCTET/BIT
341  STRINGs above). We see ``v3`` value in Version, because it is named.
342  ``rdnSequence`` is the choice of CHOICE type.
343 :11:
344  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345  default one, specified in the schema.
346 :12:
347  Shows does object contains any kind of BER encoded data (possibly
348  Sequence holding BER-encoded underlying value).
349 :13:
350  Only applicable to BER encoded data. Indefinite length encoding mark.
351 :14:
352  Only applicable to BER encoded data. If object has BER-specific
353  encoding, then ``BER`` will be shown. It does not depend on indefinite
354  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
356  could be BERed.
357
358 .. _definedby:
359
360 DEFINED BY
361 ----------
362
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
367
368 .. _defines:
369
370 defines kwarg
371 _____________
372
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 container::
377
378     class ContentInfo(Sequence):
379         schema = (
380             ("contentType", ContentType(defines=((("content",), {
381                 id_digestedData: DigestedData(),
382                 id_signedData: SignedData(),
383             }),))),
384             ("content", Any(expl=tag_ctxc(0))),
385         )
386
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
391 done.
392
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
398
399         (
400             (("parameters",), {
401                 id_ecPublicKey: ECParameters(),
402                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
403             }),
404             (("..", "subjectPublicKey"), {
405                 id_rsaEncryption: RSAPublicKey(),
406                 id_GostR3410_2001: OctetString(),
407             }),
408         ),
409
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
412 itself.
413
414 Following types can be automatically decoded (DEFINED BY):
415
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420   ``Any``/``BitString``/``OctetString``-s
421
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
426
427 .. _defines_by_path_ctx:
428
429 defines_by_path context option
430 ______________________________
431
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
435
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
438
439     (decode_path, defines)
440
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
445
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
449 of ``PKIResponse``::
450
451     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452         (
453             ("contentType",),
454             ((("content",), {id_signedData: SignedData()}),),
455         ),
456         (
457             (
458                 "content",
459                 DecodePathDefBy(id_signedData),
460                 "encapContentInfo",
461                 "eContentType",
462             ),
463             ((("eContent",), {
464                 id_cct_PKIData: PKIData(),
465                 id_cct_PKIResponse: PKIResponse(),
466             })),
467         ),
468         (
469             (
470                 "content",
471                 DecodePathDefBy(id_signedData),
472                 "encapContentInfo",
473                 "eContent",
474                 DecodePathDefBy(id_cct_PKIResponse),
475                 "controlSequence",
476                 any,
477                 "attrType",
478             ),
479             ((("attrValues",), {
480                 id_cmc_recipientNonce: RecipientNonce(),
481                 id_cmc_senderNonce: SenderNonce(),
482                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483                 id_cmc_transactionId: TransactionId(),
484             })),
485         ),
486     )})
487
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
492
493 .. _bered_ctx:
494
495 BER encoding
496 ------------
497
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
502
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506   ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
510   contain it.
511 * If object has an indefinite length encoded explicit tag, then
512   ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514   indefinite length, object's indefinite length, BER-encoding) or any
515   underlying component has that kind of encoding, then ``bered``
516   attribute is set to True. For example SignedData CMS can have
517   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
519
520 EOC (end-of-contents) token's length is taken in advance in object's
521 value length.
522
523 .. _allow_expl_oob_ctx:
524
525 Allow explicit tag out-of-bound
526 -------------------------------
527
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
533
534 .. warning::
535
536    This option should be used only for skipping some decode errors, just
537    to see the decoded structure somehow.
538
539 .. _streaming:
540
541 Streaming and dealing with huge structures
542 ------------------------------------------
543
544 .. _evgen_mode:
545
546 evgen mode
547 __________
548
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
553 structure.
554
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
558
559     class TBSCertListFast(Sequence):
560         schema = (
561             [...]
562             ("revokedCertificates", OctetString(
563                 impl=SequenceOf.tag_default,
564                 optional=True,
565             )),
566             [...]
567         )
568
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
571
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
576
577     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
579          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
581          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
582          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
584          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
585          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
586     [...]
587         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
590         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
592         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
595         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
597     [...]
598     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
599     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
601       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
603     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
605     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
607         0     [1,4,9145534]  CertificateList SEQUENCE
608
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
619
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
626
627     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
628         if (
629             len(decode_path) == 4 and
630             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631             decode_path[3] == "userCertificate"
632         ):
633             print("serial number:", int(obj))
634
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 ``.tlvlen``.
640
641 .. _evgen_mode_upto_ctx:
642
643 evgen_mode_upto
644 _______________
645
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
655
656     for decode_path, obj, _ in CertificateList().decode_evgen(
657         crl_raw,
658         ctx={"evgen_mode_upto": (
659             (("tbsCertList", "revokedCertificates", any), True),
660         )},
661     ):
662         if (
663             len(decode_path) == 3 and
664             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
665         ):
666             print("serial number:", int(obj["userCertificate"]))
667
668 .. note::
669
670    SEQUENCE/SET values with DEFAULT specified are automatically decoded
671    without evgen mode.
672
673 .. _mmap:
674
675 mmap-ed file
676 ____________
677
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
683
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
686
687     with open("huge", "rb") as fd:
688         raw = file_mmaped(fd)
689         obj = Something.decode(raw)
690
691 .. warning::
692
693    mmap-ed files in Python2.7 does not implement buffer protocol, so
694    memoryview won't work on them.
695
696 .. warning::
697
698    mmap maps the **whole** file. So it plays no role if you seek-ed it
699    before. Take the slice of the resulting memoryview with required
700    offset instead.
701
702 .. note::
703
704    If you use ZFS as underlying storage, then pay attention that
705    currently most platforms does not deal good with ZFS ARC and ordinary
706    page cache used for mmaps. It can take twice the necessary size in
707    the memory: both in page cache and ZFS ARC.
708
709 CER encoding
710 ____________
711
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
717 objects.
718
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
724
725     opener = io.open if PY2 else open
726     with opener("result", "wb") as fd:
727         obj.encode_cer(fd.write)
728
729 ::
730
731     buf = io.BytesIO()
732     obj.encode_cer(buf.write)
733
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
736
737     data = encode_cer(obj)
738
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
743
744 .. warning::
745
746    SET OF values can not be streamingly encoded, because they are
747    required to be sorted byte-by-byte. Big SET OF values still will take
748    much memory. Use neither SET nor SET OF values, as modern ASN.1
749    also recommends too.
750
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
754
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
761
762     class Certificate(Sequence):
763         schema = (...)
764         der_forced = True
765
766     class SignedAttributes(SetOf):
767         schema = Attribute()
768         bounds = (1, 32)
769         der_forced = True
770
771 .. _agg_octet_string:
772
773 agg_octet_string
774 ________________
775
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
779
780     SOME STRING[CONSTRUCTED]
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786             OCTET STRING[PRIMITIVE]
787                 DATA CHUNK
788         OCTET STRING[PRIMITIVE]
789             DATA CHUNK
790         OCTET STRING[CONSTRUCTED]
791             OCTET STRING[PRIMITIVE]
792                 DATA CHUNK
793             OCTET STRING[PRIMITIVE]
794                 DATA CHUNK
795         OCTET STRING[CONSTRUCTED]
796             OCTET STRING[CONSTRUCTED]
797                 OCTET STRING[PRIMITIVE]
798                     DATA CHUNK
799
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
804
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
811
812     fd = open("data.p7m", "rb")
813     raw = file_mmaped(fd)
814     ctx = {"bered": True}
815     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816         if decode_path == ("content",):
817             content = obj
818             break
819     else:
820         raise ValueError("no content found")
821     hasher_state = sha512()
822     def hasher(data):
823         hasher_state.update(data)
824         return len(data)
825     evgens = SignedData().decode_evgen(
826         raw[content.offset:],
827         offset=content.offset,
828         ctx=ctx,
829     )
830     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
831     fd.close()
832     digest = hasher_state.digest()
833
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
838
839 .. _seqof-iterators:
840
841 SEQUENCE OF iterators
842 _____________________
843
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
848 value again.
849
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
855
856 2-pass DER encoding
857 -------------------
858
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
866
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
870
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
874 structure.
875
876 ::
877
878     fulllen, state = obj.encode1st()
879
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
882
883 ::
884
885     opener = io.open if PY2 else open
886     with opener("result", "wb") as fd:
887         obj.encode2nd(fd.write, iter(state))
888
889 .. warning::
890
891    You **MUST NOT** use 1st pass state if anything is changed in the
892    objects. It is intended to be used immediately after 1st pass is
893    done!
894
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
899 encoding mode.
900
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
903
904 Base Obj
905 --------
906 .. autoclass:: pyderasn.Obj
907    :members:
908
909 Primitive types
910 ---------------
911
912 Boolean
913 _______
914 .. autoclass:: pyderasn.Boolean
915    :members: __init__
916
917 Integer
918 _______
919 .. autoclass:: pyderasn.Integer
920    :members: __init__, named
921
922 BitString
923 _________
924 .. autoclass:: pyderasn.BitString
925    :members: __init__, bit_len, named
926
927 OctetString
928 ___________
929 .. autoclass:: pyderasn.OctetString
930    :members: __init__
931
932 Null
933 ____
934 .. autoclass:: pyderasn.Null
935    :members: __init__
936
937 ObjectIdentifier
938 ________________
939 .. autoclass:: pyderasn.ObjectIdentifier
940    :members: __init__
941
942 Enumerated
943 __________
944 .. autoclass:: pyderasn.Enumerated
945
946 CommonString
947 ____________
948 .. autoclass:: pyderasn.CommonString
949
950 NumericString
951 _____________
952 .. autoclass:: pyderasn.NumericString
953
954 PrintableString
955 _______________
956 .. autoclass:: pyderasn.PrintableString
957    :members: __init__, allow_asterisk, allow_ampersand
958
959 UTCTime
960 _______
961 .. autoclass:: pyderasn.UTCTime
962    :members: __init__, todatetime
963
964 GeneralizedTime
965 _______________
966 .. autoclass:: pyderasn.GeneralizedTime
967    :members: __init__, todatetime
968
969 Special types
970 -------------
971
972 Choice
973 ______
974 .. autoclass:: pyderasn.Choice
975    :members: __init__, choice, value
976
977 PrimitiveTypes
978 ______________
979 .. autoclass:: PrimitiveTypes
980
981 Any
982 ___
983 .. autoclass:: pyderasn.Any
984    :members: __init__
985
986 Constructed types
987 -----------------
988
989 Sequence
990 ________
991 .. autoclass:: pyderasn.Sequence
992    :members: __init__
993
994 Set
995 ___
996 .. autoclass:: pyderasn.Set
997    :members: __init__
998
999 SequenceOf
1000 __________
1001 .. autoclass:: pyderasn.SequenceOf
1002    :members: __init__
1003
1004 SetOf
1005 _____
1006 .. autoclass:: pyderasn.SetOf
1007    :members: __init__
1008
1009 Various
1010 -------
1011
1012 .. autofunction:: pyderasn.abs_decode_path
1013 .. autofunction:: pyderasn.agg_octet_string
1014 .. autofunction:: pyderasn.colonize_hex
1015 .. autofunction:: pyderasn.encode2pass
1016 .. autofunction:: pyderasn.encode_cer
1017 .. autofunction:: pyderasn.file_mmaped
1018 .. autofunction:: pyderasn.hexenc
1019 .. autofunction:: pyderasn.hexdec
1020 .. autofunction:: pyderasn.tag_encode
1021 .. autofunction:: pyderasn.tag_decode
1022 .. autofunction:: pyderasn.tag_ctxp
1023 .. autofunction:: pyderasn.tag_ctxc
1024 .. autoclass:: pyderasn.DecodeError
1025    :members: __init__
1026 .. autoclass:: pyderasn.NotEnoughData
1027 .. autoclass:: pyderasn.ExceedingData
1028 .. autoclass:: pyderasn.LenIndefForm
1029 .. autoclass:: pyderasn.TagMismatch
1030 .. autoclass:: pyderasn.InvalidLength
1031 .. autoclass:: pyderasn.InvalidOID
1032 .. autoclass:: pyderasn.ObjUnknown
1033 .. autoclass:: pyderasn.ObjNotReady
1034 .. autoclass:: pyderasn.InvalidValueType
1035 .. autoclass:: pyderasn.BoundsError
1036
1037 .. _cmdline:
1038
1039 Command-line usage
1040 ------------------
1041
1042 You can decode DER/BER files using command line abilities::
1043
1044     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1045
1046 If there is no schema for your file, then you can try parsing it without,
1047 but of course IMPLICIT tags will often make it impossible. But result is
1048 good enough for the certificate above::
1049
1050     $ python -m pyderasn path/to/file
1051         0   [1,3,1604]  . >: SEQUENCE OF
1052         4   [1,3,1453]  . . >: SEQUENCE OF
1053         8   [0,0,   5]  . . . . >: [0] ANY
1054                         . . . . . A0:03:02:01:02
1055        13   [1,1,   3]  . . . . >: INTEGER 61595
1056        18   [1,1,  13]  . . . . >: SEQUENCE OF
1057        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1058        31   [1,1,   0]  . . . . . . >: NULL
1059        33   [1,3, 274]  . . . . >: SEQUENCE OF
1060        37   [1,1,  11]  . . . . . . >: SET OF
1061        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1062        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1063        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1064     [...]
1065      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1066      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1067      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1068                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1069                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1070                         . . . . . . . . . 61:2E:63:6F:6D:2F
1071      1461   [1,1,  13]  . . >: SEQUENCE OF
1072      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1073      1474   [1,1,   0]  . . . . >: NULL
1074      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1075                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1076                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1077     [...]
1078
1079 Human readable OIDs
1080 ___________________
1081
1082 If you have got dictionaries with ObjectIdentifiers, like example one
1083 from ``tests/test_crts.py``::
1084
1085     stroid2name = {
1086         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1087         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1088         [...]
1089         "2.5.4.10": "id-at-organizationName",
1090         "2.5.4.11": "id-at-organizationalUnitName",
1091     }
1092
1093 then you can pass it to pretty printer to see human readable OIDs::
1094
1095     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1096     [...]
1097        37   [1,1,  11]  . . . . . . >: SET OF
1098        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1099        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1100        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1101        50   [1,1,  18]  . . . . . . >: SET OF
1102        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1103        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1104        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1105        70   [1,1,  18]  . . . . . . >: SET OF
1106        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1107        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1108        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1109     [...]
1110
1111 Decode paths
1112 ____________
1113
1114 Each decoded element has so-called decode path: sequence of structure
1115 names it is passing during the decode process. Each element has its own
1116 unique path inside the whole ASN.1 tree. You can print it out with
1117 ``--print-decode-path`` option::
1118
1119     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1120        0    [1,3,1604]  Certificate SEQUENCE []
1121        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1122       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1123       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1124       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1125       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1126       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1127                          . . . . 05:00
1128       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1129       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1130       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1131       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1132       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1133       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1134                          . . . . . . . 13:02:45:53
1135       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1136     [...]
1137
1138 Now you can print only the specified tree, for example signature algorithm::
1139
1140     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1141       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1142       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1143       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1144                          . . 05:00
1145 """
1146
1147 from array import array
1148 from codecs import getdecoder
1149 from codecs import getencoder
1150 from collections import namedtuple
1151 from collections import OrderedDict
1152 from copy import copy
1153 from datetime import datetime
1154 from datetime import timedelta
1155 from io import BytesIO
1156 from math import ceil
1157 from mmap import mmap
1158 from mmap import PROT_READ
1159 from operator import attrgetter
1160 from string import ascii_letters
1161 from string import digits
1162 from sys import maxsize as sys_maxsize
1163 from sys import version_info
1164 from unicodedata import category as unicat
1165
1166 from six import add_metaclass
1167 from six import binary_type
1168 from six import byte2int
1169 from six import indexbytes
1170 from six import int2byte
1171 from six import integer_types
1172 from six import iterbytes
1173 from six import iteritems
1174 from six import itervalues
1175 from six import PY2
1176 from six import string_types
1177 from six import text_type
1178 from six import unichr as six_unichr
1179 from six.moves import xrange as six_xrange
1180
1181
1182 try:
1183     from termcolor import colored
1184 except ImportError:  # pragma: no cover
1185     def colored(what, *args, **kwargs):
1186         return what
1187
1188 __version__ = "7.4"
1189
1190 __all__ = (
1191     "agg_octet_string",
1192     "Any",
1193     "BitString",
1194     "BMPString",
1195     "Boolean",
1196     "BoundsError",
1197     "Choice",
1198     "colonize_hex",
1199     "DecodeError",
1200     "DecodePathDefBy",
1201     "encode2pass",
1202     "encode_cer",
1203     "Enumerated",
1204     "ExceedingData",
1205     "file_mmaped",
1206     "GeneralizedTime",
1207     "GeneralString",
1208     "GraphicString",
1209     "hexdec",
1210     "hexenc",
1211     "IA5String",
1212     "Integer",
1213     "InvalidLength",
1214     "InvalidOID",
1215     "InvalidValueType",
1216     "ISO646String",
1217     "LenIndefForm",
1218     "NotEnoughData",
1219     "Null",
1220     "NumericString",
1221     "obj_by_path",
1222     "ObjectIdentifier",
1223     "ObjNotReady",
1224     "ObjUnknown",
1225     "OctetString",
1226     "PrimitiveTypes",
1227     "PrintableString",
1228     "Sequence",
1229     "SequenceOf",
1230     "Set",
1231     "SetOf",
1232     "T61String",
1233     "tag_ctxc",
1234     "tag_ctxp",
1235     "tag_decode",
1236     "TagClassApplication",
1237     "TagClassContext",
1238     "TagClassPrivate",
1239     "TagClassUniversal",
1240     "TagFormConstructed",
1241     "TagFormPrimitive",
1242     "TagMismatch",
1243     "TeletexString",
1244     "UniversalString",
1245     "UTCTime",
1246     "UTF8String",
1247     "VideotexString",
1248     "VisibleString",
1249 )
1250
1251 TagClassUniversal = 0
1252 TagClassApplication = 1 << 6
1253 TagClassContext = 1 << 7
1254 TagClassPrivate = 1 << 6 | 1 << 7
1255 TagFormPrimitive = 0
1256 TagFormConstructed = 1 << 5
1257 TagClassReprs = {
1258     TagClassContext: "",
1259     TagClassApplication: "APPLICATION ",
1260     TagClassPrivate: "PRIVATE ",
1261     TagClassUniversal: "UNIV ",
1262 }
1263 EOC = b"\x00\x00"
1264 EOC_LEN = len(EOC)
1265 LENINDEF = b"\x80"  # length indefinite mark
1266 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1267 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1268 SET01 = frozenset("01")
1269 DECIMALS = frozenset(digits)
1270 DECIMAL_SIGNS = ".,"
1271 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1272
1273
1274 def file_mmaped(fd):
1275     """Make mmap-ed memoryview for reading from file
1276
1277     :param fd: file object
1278     :returns: memoryview over read-only mmap-ing of the whole file
1279     """
1280     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1281
1282 def pureint(value):
1283     if not set(value) <= DECIMALS:
1284         raise ValueError("non-pure integer")
1285     return int(value)
1286
1287 def fractions2float(fractions_raw):
1288     pureint(fractions_raw)
1289     return float("0." + fractions_raw)
1290
1291
1292 def get_def_by_path(defines_by_path, sub_decode_path):
1293     """Get define by decode path
1294     """
1295     for path, define in defines_by_path:
1296         if len(path) != len(sub_decode_path):
1297             continue
1298         for p1, p2 in zip(path, sub_decode_path):
1299             if (not p1 is any) and (p1 != p2):
1300                 break
1301         else:
1302             return define
1303
1304
1305 ########################################################################
1306 # Errors
1307 ########################################################################
1308
1309 class ASN1Error(ValueError):
1310     pass
1311
1312
1313 class DecodeError(ASN1Error):
1314     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1315         """
1316         :param str msg: reason of decode failing
1317         :param klass: optional exact DecodeError inherited class (like
1318                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1319                       :py:exc:`InvalidLength`)
1320         :param decode_path: tuple of strings. It contains human
1321                             readable names of the fields through which
1322                             decoding process has passed
1323         :param int offset: binary offset where failure happened
1324         """
1325         super(DecodeError, self).__init__()
1326         self.msg = msg
1327         self.klass = klass
1328         self.decode_path = decode_path
1329         self.offset = offset
1330
1331     def __str__(self):
1332         return " ".join(
1333             c for c in (
1334                 "" if self.klass is None else self.klass.__name__,
1335                 (
1336                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1337                     if len(self.decode_path) > 0 else ""
1338                 ),
1339                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1340                 self.msg,
1341             ) if c != ""
1342         )
1343
1344     def __repr__(self):
1345         return "%s(%s)" % (self.__class__.__name__, self)
1346
1347
1348 class NotEnoughData(DecodeError):
1349     pass
1350
1351
1352 class ExceedingData(ASN1Error):
1353     def __init__(self, nbytes):
1354         super(ExceedingData, self).__init__()
1355         self.nbytes = nbytes
1356
1357     def __str__(self):
1358         return "%d trailing bytes" % self.nbytes
1359
1360     def __repr__(self):
1361         return "%s(%s)" % (self.__class__.__name__, self)
1362
1363
1364 class LenIndefForm(DecodeError):
1365     pass
1366
1367
1368 class TagMismatch(DecodeError):
1369     pass
1370
1371
1372 class InvalidLength(DecodeError):
1373     pass
1374
1375
1376 class InvalidOID(DecodeError):
1377     pass
1378
1379
1380 class ObjUnknown(ASN1Error):
1381     def __init__(self, name):
1382         super(ObjUnknown, self).__init__()
1383         self.name = name
1384
1385     def __str__(self):
1386         return "object is unknown: %s" % self.name
1387
1388     def __repr__(self):
1389         return "%s(%s)" % (self.__class__.__name__, self)
1390
1391
1392 class ObjNotReady(ASN1Error):
1393     def __init__(self, name):
1394         super(ObjNotReady, self).__init__()
1395         self.name = name
1396
1397     def __str__(self):
1398         return "object is not ready: %s" % self.name
1399
1400     def __repr__(self):
1401         return "%s(%s)" % (self.__class__.__name__, self)
1402
1403
1404 class InvalidValueType(ASN1Error):
1405     def __init__(self, expected_types):
1406         super(InvalidValueType, self).__init__()
1407         self.expected_types = expected_types
1408
1409     def __str__(self):
1410         return "invalid value type, expected: %s" % ", ".join(
1411             [repr(t) for t in self.expected_types]
1412         )
1413
1414     def __repr__(self):
1415         return "%s(%s)" % (self.__class__.__name__, self)
1416
1417
1418 class BoundsError(ASN1Error):
1419     def __init__(self, bound_min, value, bound_max):
1420         super(BoundsError, self).__init__()
1421         self.bound_min = bound_min
1422         self.value = value
1423         self.bound_max = bound_max
1424
1425     def __str__(self):
1426         return "unsatisfied bounds: %s <= %s <= %s" % (
1427             self.bound_min,
1428             self.value,
1429             self.bound_max,
1430         )
1431
1432     def __repr__(self):
1433         return "%s(%s)" % (self.__class__.__name__, self)
1434
1435
1436 ########################################################################
1437 # Basic coders
1438 ########################################################################
1439
1440 _hexdecoder = getdecoder("hex")
1441 _hexencoder = getencoder("hex")
1442
1443
1444 def hexdec(data):
1445     """Binary data to hexadecimal string convert
1446     """
1447     return _hexdecoder(data)[0]
1448
1449
1450 def hexenc(data):
1451     """Hexadecimal string to binary data convert
1452     """
1453     return _hexencoder(data)[0].decode("ascii")
1454
1455
1456 def int_bytes_len(num, byte_len=8):
1457     if num == 0:
1458         return 1
1459     return int(ceil(float(num.bit_length()) / byte_len))
1460
1461
1462 def zero_ended_encode(num):
1463     octets = bytearray(int_bytes_len(num, 7))
1464     i = len(octets) - 1
1465     octets[i] = num & 0x7F
1466     num >>= 7
1467     i -= 1
1468     while num > 0:
1469         octets[i] = 0x80 | (num & 0x7F)
1470         num >>= 7
1471         i -= 1
1472     return bytes(octets)
1473
1474
1475 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1476     """Encode tag to binary form
1477
1478     :param int num: tag's number
1479     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1480                       :py:data:`pyderasn.TagClassContext`,
1481                       :py:data:`pyderasn.TagClassApplication`,
1482                       :py:data:`pyderasn.TagClassPrivate`)
1483     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1484                      :py:data:`pyderasn.TagFormConstructed`)
1485     """
1486     if num < 31:
1487         # [XX|X|.....]
1488         return int2byte(klass | form | num)
1489     # [XX|X|11111][1.......][1.......] ... [0.......]
1490     return int2byte(klass | form | 31) + zero_ended_encode(num)
1491
1492
1493 def tag_decode(tag):
1494     """Decode tag from binary form
1495
1496     .. warning::
1497
1498        No validation is performed, assuming that it has already passed.
1499
1500     It returns tuple with three integers, as
1501     :py:func:`pyderasn.tag_encode` accepts.
1502     """
1503     first_octet = byte2int(tag)
1504     klass = first_octet & 0xC0
1505     form = first_octet & 0x20
1506     if first_octet & 0x1F < 0x1F:
1507         return (klass, form, first_octet & 0x1F)
1508     num = 0
1509     for octet in iterbytes(tag[1:]):
1510         num <<= 7
1511         num |= octet & 0x7F
1512     return (klass, form, num)
1513
1514
1515 def tag_ctxp(num):
1516     """Create CONTEXT PRIMITIVE tag
1517     """
1518     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1519
1520
1521 def tag_ctxc(num):
1522     """Create CONTEXT CONSTRUCTED tag
1523     """
1524     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1525
1526
1527 def tag_strip(data):
1528     """Take off tag from the data
1529
1530     :returns: (encoded tag, tag length, remaining data)
1531     """
1532     if len(data) == 0:
1533         raise NotEnoughData("no data at all")
1534     if byte2int(data) & 0x1F < 31:
1535         return data[:1], 1, data[1:]
1536     i = 0
1537     while True:
1538         i += 1
1539         if i == len(data):
1540             raise DecodeError("unfinished tag")
1541         if indexbytes(data, i) & 0x80 == 0:
1542             break
1543     i += 1
1544     return data[:i], i, data[i:]
1545
1546
1547 def len_encode(l):
1548     if l < 0x80:
1549         return int2byte(l)
1550     octets = bytearray(int_bytes_len(l) + 1)
1551     octets[0] = 0x80 | (len(octets) - 1)
1552     for i in six_xrange(len(octets) - 1, 0, -1):
1553         octets[i] = l & 0xFF
1554         l >>= 8
1555     return bytes(octets)
1556
1557
1558 def len_decode(data):
1559     """Decode length
1560
1561     :returns: (decoded length, length's length, remaining data)
1562     :raises LenIndefForm: if indefinite form encoding is met
1563     """
1564     if len(data) == 0:
1565         raise NotEnoughData("no data at all")
1566     first_octet = byte2int(data)
1567     if first_octet & 0x80 == 0:
1568         return first_octet, 1, data[1:]
1569     octets_num = first_octet & 0x7F
1570     if octets_num + 1 > len(data):
1571         raise NotEnoughData("encoded length is longer than data")
1572     if octets_num == 0:
1573         raise LenIndefForm()
1574     if byte2int(data[1:]) == 0:
1575         raise DecodeError("leading zeros")
1576     l = 0
1577     for v in iterbytes(data[1:1 + octets_num]):
1578         l = (l << 8) | v
1579     if l <= 127:
1580         raise DecodeError("long form instead of short one")
1581     return l, 1 + octets_num, data[1 + octets_num:]
1582
1583
1584 LEN0 = len_encode(0)
1585 LEN1 = len_encode(1)
1586 LEN1K = len_encode(1000)
1587
1588
1589 def len_size(l):
1590     """How many bytes length field will take
1591     """
1592     if l < 128:
1593         return 1
1594     if l < 256:  # 1 << 8
1595         return 2
1596     if l < 65536:  # 1 << 16
1597         return 3
1598     if l < 16777216:  # 1 << 24
1599         return 4
1600     if l < 4294967296:  # 1 << 32
1601         return 5
1602     if l < 1099511627776:  # 1 << 40
1603         return 6
1604     if l < 281474976710656:  # 1 << 48
1605         return 7
1606     if l < 72057594037927936:  # 1 << 56
1607         return 8
1608     raise OverflowError("too big length")
1609
1610
1611 def write_full(writer, data):
1612     """Fully write provided data
1613
1614     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1615
1616     BytesIO does not guarantee that the whole data will be written at
1617     once. That function write everything provided, raising an error if
1618     ``writer`` returns None.
1619     """
1620     data = memoryview(data)
1621     written = 0
1622     while written != len(data):
1623         n = writer(data[written:])
1624         if n is None:
1625             raise ValueError("can not write to buf")
1626         written += n
1627
1628
1629 # If it is 64-bit system, then use compact 64-bit array of unsigned
1630 # longs. Use an ordinary list with universal integers otherwise, that
1631 # is slower.
1632 if sys_maxsize > 2 ** 32:
1633     def state_2pass_new():
1634         return array("L")
1635 else:
1636     def state_2pass_new():
1637         return []
1638
1639
1640 ########################################################################
1641 # Base class
1642 ########################################################################
1643
1644 class AutoAddSlots(type):
1645     def __new__(cls, name, bases, _dict):
1646         _dict["__slots__"] = _dict.get("__slots__", ())
1647         return type.__new__(cls, name, bases, _dict)
1648
1649
1650 BasicState = namedtuple("BasicState", (
1651     "version",
1652     "tag",
1653     "tag_order",
1654     "expl",
1655     "default",
1656     "optional",
1657     "offset",
1658     "llen",
1659     "vlen",
1660     "expl_lenindef",
1661     "lenindef",
1662     "ber_encoded",
1663 ), **NAMEDTUPLE_KWARGS)
1664
1665
1666 @add_metaclass(AutoAddSlots)
1667 class Obj(object):
1668     """Common ASN.1 object class
1669
1670     All ASN.1 types are inherited from it. It has metaclass that
1671     automatically adds ``__slots__`` to all inherited classes.
1672     """
1673     __slots__ = (
1674         "tag",
1675         "_tag_order",
1676         "_value",
1677         "_expl",
1678         "default",
1679         "optional",
1680         "offset",
1681         "llen",
1682         "vlen",
1683         "expl_lenindef",
1684         "lenindef",
1685         "ber_encoded",
1686     )
1687
1688     def __init__(
1689             self,
1690             impl=None,
1691             expl=None,
1692             default=None,
1693             optional=False,
1694             _decoded=(0, 0, 0),
1695     ):
1696         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1697         self._expl = getattr(self, "expl", None) if expl is None else expl
1698         if self.tag != self.tag_default and self._expl is not None:
1699             raise ValueError("implicit and explicit tags can not be set simultaneously")
1700         if self.tag is None:
1701             self._tag_order = None
1702         else:
1703             tag_class, _, tag_num = tag_decode(
1704                 self.tag if self._expl is None else self._expl
1705             )
1706             self._tag_order = (tag_class, tag_num)
1707         if default is not None:
1708             optional = True
1709         self.optional = optional
1710         self.offset, self.llen, self.vlen = _decoded
1711         self.default = None
1712         self.expl_lenindef = False
1713         self.lenindef = False
1714         self.ber_encoded = False
1715
1716     @property
1717     def ready(self):  # pragma: no cover
1718         """Is object ready to be encoded?
1719         """
1720         raise NotImplementedError()
1721
1722     def _assert_ready(self):
1723         if not self.ready:
1724             raise ObjNotReady(self.__class__.__name__)
1725
1726     @property
1727     def bered(self):
1728         """Is either object or any elements inside is BER encoded?
1729         """
1730         return self.expl_lenindef or self.lenindef or self.ber_encoded
1731
1732     @property
1733     def decoded(self):
1734         """Is object decoded?
1735         """
1736         return (self.llen + self.vlen) > 0
1737
1738     def __getstate__(self):  # pragma: no cover
1739         """Used for making safe to be mutable pickleable copies
1740         """
1741         raise NotImplementedError()
1742
1743     def __setstate__(self, state):
1744         if state.version != __version__:
1745             raise ValueError("data is pickled by different PyDERASN version")
1746         self.tag = state.tag
1747         self._tag_order = state.tag_order
1748         self._expl = state.expl
1749         self.default = state.default
1750         self.optional = state.optional
1751         self.offset = state.offset
1752         self.llen = state.llen
1753         self.vlen = state.vlen
1754         self.expl_lenindef = state.expl_lenindef
1755         self.lenindef = state.lenindef
1756         self.ber_encoded = state.ber_encoded
1757
1758     @property
1759     def tag_order(self):
1760         """Tag's (class, number) used for DER/CER sorting
1761         """
1762         return self._tag_order
1763
1764     @property
1765     def tag_order_cer(self):
1766         return self.tag_order
1767
1768     @property
1769     def tlen(self):
1770         """.. seealso:: :ref:`decoding`
1771         """
1772         return len(self.tag)
1773
1774     @property
1775     def tlvlen(self):
1776         """.. seealso:: :ref:`decoding`
1777         """
1778         return self.tlen + self.llen + self.vlen
1779
1780     def __str__(self):  # pragma: no cover
1781         return self.__bytes__() if PY2 else self.__unicode__()
1782
1783     def __ne__(self, their):
1784         return not(self == their)
1785
1786     def __gt__(self, their):  # pragma: no cover
1787         return not(self < their)
1788
1789     def __le__(self, their):  # pragma: no cover
1790         return (self == their) or (self < their)
1791
1792     def __ge__(self, their):  # pragma: no cover
1793         return (self == their) or (self > their)
1794
1795     def _encode(self):  # pragma: no cover
1796         raise NotImplementedError()
1797
1798     def _encode_cer(self, writer):
1799         write_full(writer, self._encode())
1800
1801     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1802         yield NotImplemented
1803
1804     def _encode1st(self, state):
1805         raise NotImplementedError()
1806
1807     def _encode2nd(self, writer, state_iter):
1808         raise NotImplementedError()
1809
1810     def encode(self):
1811         """DER encode the structure
1812
1813         :returns: DER representation
1814         """
1815         raw = self._encode()
1816         if self._expl is None:
1817             return raw
1818         return b"".join((self._expl, len_encode(len(raw)), raw))
1819
1820     def encode1st(self, state=None):
1821         """Do the 1st pass of 2-pass encoding
1822
1823         :rtype: (int, array("L"))
1824         :returns: full length of encoded data and precalculated various
1825                   objects lengths
1826         """
1827         if state is None:
1828             state = state_2pass_new()
1829         if self._expl is None:
1830             return self._encode1st(state)
1831         state.append(0)
1832         idx = len(state) - 1
1833         vlen, _ = self._encode1st(state)
1834         state[idx] = vlen
1835         fulllen = len(self._expl) + len_size(vlen) + vlen
1836         return fulllen, state
1837
1838     def encode2nd(self, writer, state_iter):
1839         """Do the 2nd pass of 2-pass encoding
1840
1841         :param writer: must comply with ``io.RawIOBase.write`` behaviour
1842         :param state_iter: iterator over the 1st pass state (``iter(state)``)
1843         """
1844         if self._expl is None:
1845             self._encode2nd(writer, state_iter)
1846         else:
1847             write_full(writer, self._expl + len_encode(next(state_iter)))
1848             self._encode2nd(writer, state_iter)
1849
1850     def encode_cer(self, writer):
1851         """CER encode the structure to specified writer
1852
1853         :param writer: must comply with ``io.RawIOBase.write``
1854                        behaviour. It takes slice to be written and
1855                        returns number of bytes processed. If it returns
1856                        None, then exception will be raised
1857         """
1858         if self._expl is not None:
1859             write_full(writer, self._expl + LENINDEF)
1860         if getattr(self, "der_forced", False):
1861             write_full(writer, self._encode())
1862         else:
1863             self._encode_cer(writer)
1864         if self._expl is not None:
1865             write_full(writer, EOC)
1866
1867     def hexencode(self):
1868         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1869         """
1870         return hexenc(self.encode())
1871
1872     def decode(
1873             self,
1874             data,
1875             offset=0,
1876             leavemm=False,
1877             decode_path=(),
1878             ctx=None,
1879             tag_only=False,
1880             _ctx_immutable=True,
1881     ):
1882         """Decode the data
1883
1884         :param data: either binary or memoryview
1885         :param int offset: initial data's offset
1886         :param bool leavemm: do we need to leave memoryview of remaining
1887                     data as is, or convert it to bytes otherwise
1888         :param decode_path: current decode path (tuples of strings,
1889                             possibly with DecodePathDefBy) with will be
1890                             the root for all underlying objects
1891         :param ctx: optional :ref:`context <ctx>` governing decoding process
1892         :param bool tag_only: decode only the tag, without length and
1893                               contents (used only in Choice and Set
1894                               structures, trying to determine if tag satisfies
1895                               the schema)
1896         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1897                                     before using it?
1898         :returns: (Obj, remaining data)
1899
1900         .. seealso:: :ref:`decoding`
1901         """
1902         result = next(self.decode_evgen(
1903             data,
1904             offset,
1905             leavemm,
1906             decode_path,
1907             ctx,
1908             tag_only,
1909             _ctx_immutable,
1910             _evgen_mode=False,
1911         ))
1912         if result is None:
1913             return None
1914         _, obj, tail = result
1915         return obj, tail
1916
1917     def decode_evgen(
1918             self,
1919             data,
1920             offset=0,
1921             leavemm=False,
1922             decode_path=(),
1923             ctx=None,
1924             tag_only=False,
1925             _ctx_immutable=True,
1926             _evgen_mode=True,
1927     ):
1928         """Decode with evgen mode on
1929
1930         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1931         it returns the generator producing ``(decode_path, obj, tail)``
1932         values.
1933         .. seealso:: :ref:`evgen mode <evgen_mode>`.
1934         """
1935         if ctx is None:
1936             ctx = {}
1937         elif _ctx_immutable:
1938             ctx = copy(ctx)
1939         tlv = memoryview(data)
1940         if (
1941                 _evgen_mode and
1942                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1943         ):
1944             _evgen_mode = False
1945         if self._expl is None:
1946             for result in self._decode(
1947                     tlv,
1948                     offset=offset,
1949                     decode_path=decode_path,
1950                     ctx=ctx,
1951                     tag_only=tag_only,
1952                     evgen_mode=_evgen_mode,
1953             ):
1954                 if tag_only:
1955                     yield None
1956                     return
1957                 _decode_path, obj, tail = result
1958                 if not _decode_path is decode_path:
1959                     yield result
1960         else:
1961             try:
1962                 t, tlen, lv = tag_strip(tlv)
1963             except DecodeError as err:
1964                 raise err.__class__(
1965                     msg=err.msg,
1966                     klass=self.__class__,
1967                     decode_path=decode_path,
1968                     offset=offset,
1969                 )
1970             if t != self._expl:
1971                 raise TagMismatch(
1972                     klass=self.__class__,
1973                     decode_path=decode_path,
1974                     offset=offset,
1975                 )
1976             try:
1977                 l, llen, v = len_decode(lv)
1978             except LenIndefForm as err:
1979                 if not ctx.get("bered", False):
1980                     raise err.__class__(
1981                         msg=err.msg,
1982                         klass=self.__class__,
1983                         decode_path=decode_path,
1984                         offset=offset,
1985                     )
1986                 llen, v = 1, lv[1:]
1987                 offset += tlen + llen
1988                 for result in self._decode(
1989                         v,
1990                         offset=offset,
1991                         decode_path=decode_path,
1992                         ctx=ctx,
1993                         tag_only=tag_only,
1994                         evgen_mode=_evgen_mode,
1995                 ):
1996                     if tag_only:  # pragma: no cover
1997                         yield None
1998                         return
1999                     _decode_path, obj, tail = result
2000                     if not _decode_path is decode_path:
2001                         yield result
2002                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2003                 if eoc_expected.tobytes() != EOC:
2004                     raise DecodeError(
2005                         "no EOC",
2006                         klass=self.__class__,
2007                         decode_path=decode_path,
2008                         offset=offset,
2009                     )
2010                 obj.vlen += EOC_LEN
2011                 obj.expl_lenindef = True
2012             except DecodeError as err:
2013                 raise err.__class__(
2014                     msg=err.msg,
2015                     klass=self.__class__,
2016                     decode_path=decode_path,
2017                     offset=offset,
2018                 )
2019             else:
2020                 if l > len(v):
2021                     raise NotEnoughData(
2022                         "encoded length is longer than data",
2023                         klass=self.__class__,
2024                         decode_path=decode_path,
2025                         offset=offset,
2026                     )
2027                 for result in self._decode(
2028                         v,
2029                         offset=offset + tlen + llen,
2030                         decode_path=decode_path,
2031                         ctx=ctx,
2032                         tag_only=tag_only,
2033                         evgen_mode=_evgen_mode,
2034                 ):
2035                     if tag_only:  # pragma: no cover
2036                         yield None
2037                         return
2038                     _decode_path, obj, tail = result
2039                     if not _decode_path is decode_path:
2040                         yield result
2041                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2042                     raise DecodeError(
2043                         "explicit tag out-of-bound, longer than data",
2044                         klass=self.__class__,
2045                         decode_path=decode_path,
2046                         offset=offset,
2047                     )
2048         yield decode_path, obj, (tail if leavemm else tail.tobytes())
2049
2050     def decod(self, data, offset=0, decode_path=(), ctx=None):
2051         """Decode the data, check that tail is empty
2052
2053         :raises ExceedingData: if tail is not empty
2054
2055         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2056         (decode without tail) that also checks that there is no
2057         trailing data left.
2058         """
2059         obj, tail = self.decode(
2060             data,
2061             offset=offset,
2062             decode_path=decode_path,
2063             ctx=ctx,
2064             leavemm=True,
2065         )
2066         if len(tail) > 0:
2067             raise ExceedingData(len(tail))
2068         return obj
2069
2070     def hexdecode(self, data, *args, **kwargs):
2071         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2072         """
2073         return self.decode(hexdec(data), *args, **kwargs)
2074
2075     def hexdecod(self, data, *args, **kwargs):
2076         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2077         """
2078         return self.decod(hexdec(data), *args, **kwargs)
2079
2080     @property
2081     def expled(self):
2082         """.. seealso:: :ref:`decoding`
2083         """
2084         return self._expl is not None
2085
2086     @property
2087     def expl_tag(self):
2088         """.. seealso:: :ref:`decoding`
2089         """
2090         return self._expl
2091
2092     @property
2093     def expl_tlen(self):
2094         """.. seealso:: :ref:`decoding`
2095         """
2096         return len(self._expl)
2097
2098     @property
2099     def expl_llen(self):
2100         """.. seealso:: :ref:`decoding`
2101         """
2102         if self.expl_lenindef:
2103             return 1
2104         return len(len_encode(self.tlvlen))
2105
2106     @property
2107     def expl_offset(self):
2108         """.. seealso:: :ref:`decoding`
2109         """
2110         return self.offset - self.expl_tlen - self.expl_llen
2111
2112     @property
2113     def expl_vlen(self):
2114         """.. seealso:: :ref:`decoding`
2115         """
2116         return self.tlvlen
2117
2118     @property
2119     def expl_tlvlen(self):
2120         """.. seealso:: :ref:`decoding`
2121         """
2122         return self.expl_tlen + self.expl_llen + self.expl_vlen
2123
2124     @property
2125     def fulloffset(self):
2126         """.. seealso:: :ref:`decoding`
2127         """
2128         return self.expl_offset if self.expled else self.offset
2129
2130     @property
2131     def fulllen(self):
2132         """.. seealso:: :ref:`decoding`
2133         """
2134         return self.expl_tlvlen if self.expled else self.tlvlen
2135
2136     def pps_lenindef(self, decode_path):
2137         if self.lenindef and not (
2138                 getattr(self, "defined", None) is not None and
2139                 self.defined[1].lenindef
2140         ):
2141             yield _pp(
2142                 asn1_type_name="EOC",
2143                 obj_name="",
2144                 decode_path=decode_path,
2145                 offset=(
2146                     self.offset + self.tlvlen -
2147                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2148                 ),
2149                 tlen=1,
2150                 llen=1,
2151                 vlen=0,
2152                 ber_encoded=True,
2153                 bered=True,
2154             )
2155         if self.expl_lenindef:
2156             yield _pp(
2157                 asn1_type_name="EOC",
2158                 obj_name="EXPLICIT",
2159                 decode_path=decode_path,
2160                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2161                 tlen=1,
2162                 llen=1,
2163                 vlen=0,
2164                 ber_encoded=True,
2165                 bered=True,
2166             )
2167
2168
2169 def encode_cer(obj):
2170     """Encode to CER in memory buffer
2171
2172     :returns bytes: memory buffer contents
2173     """
2174     buf = BytesIO()
2175     obj.encode_cer(buf.write)
2176     return buf.getvalue()
2177
2178
2179 def encode2pass(obj):
2180     """Encode (2-pass mode) to DER in memory buffer
2181
2182     :returns bytes: memory buffer contents
2183     """
2184     buf = BytesIO()
2185     _, state = obj.encode1st()
2186     obj.encode2nd(buf.write, iter(state))
2187     return buf.getvalue()
2188
2189
2190 class DecodePathDefBy(object):
2191     """DEFINED BY representation inside decode path
2192     """
2193     __slots__ = ("defined_by",)
2194
2195     def __init__(self, defined_by):
2196         self.defined_by = defined_by
2197
2198     def __ne__(self, their):
2199         return not(self == their)
2200
2201     def __eq__(self, their):
2202         if not isinstance(their, self.__class__):
2203             return False
2204         return self.defined_by == their.defined_by
2205
2206     def __str__(self):
2207         return "DEFINED BY " + str(self.defined_by)
2208
2209     def __repr__(self):
2210         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2211
2212
2213 ########################################################################
2214 # Pretty printing
2215 ########################################################################
2216
2217 PP = namedtuple("PP", (
2218     "obj",
2219     "asn1_type_name",
2220     "obj_name",
2221     "decode_path",
2222     "value",
2223     "blob",
2224     "optional",
2225     "default",
2226     "impl",
2227     "expl",
2228     "offset",
2229     "tlen",
2230     "llen",
2231     "vlen",
2232     "expl_offset",
2233     "expl_tlen",
2234     "expl_llen",
2235     "expl_vlen",
2236     "expl_lenindef",
2237     "lenindef",
2238     "ber_encoded",
2239     "bered",
2240 ), **NAMEDTUPLE_KWARGS)
2241
2242
2243 def _pp(
2244         obj=None,
2245         asn1_type_name="unknown",
2246         obj_name="unknown",
2247         decode_path=(),
2248         value=None,
2249         blob=None,
2250         optional=False,
2251         default=False,
2252         impl=None,
2253         expl=None,
2254         offset=0,
2255         tlen=0,
2256         llen=0,
2257         vlen=0,
2258         expl_offset=None,
2259         expl_tlen=None,
2260         expl_llen=None,
2261         expl_vlen=None,
2262         expl_lenindef=False,
2263         lenindef=False,
2264         ber_encoded=False,
2265         bered=False,
2266 ):
2267     return PP(
2268         obj,
2269         asn1_type_name,
2270         obj_name,
2271         decode_path,
2272         value,
2273         blob,
2274         optional,
2275         default,
2276         impl,
2277         expl,
2278         offset,
2279         tlen,
2280         llen,
2281         vlen,
2282         expl_offset,
2283         expl_tlen,
2284         expl_llen,
2285         expl_vlen,
2286         expl_lenindef,
2287         lenindef,
2288         ber_encoded,
2289         bered,
2290     )
2291
2292
2293 def _colourize(what, colour, with_colours, attrs=("bold",)):
2294     return colored(what, colour, attrs=attrs) if with_colours else what
2295
2296
2297 def colonize_hex(hexed):
2298     """Separate hexadecimal string with colons
2299     """
2300     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2301
2302
2303 def pp_console_row(
2304         pp,
2305         oid_maps=(),
2306         with_offsets=False,
2307         with_blob=True,
2308         with_colours=False,
2309         with_decode_path=False,
2310         decode_path_len_decrease=0,
2311 ):
2312     cols = []
2313     if with_offsets:
2314         col = "%5d%s%s" % (
2315             pp.offset,
2316             (
2317                 "  " if pp.expl_offset is None else
2318                 ("-%d" % (pp.offset - pp.expl_offset))
2319             ),
2320             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2321         )
2322         col = _colourize(col, "red", with_colours, ())
2323         col += _colourize("B", "red", with_colours) if pp.bered else " "
2324         cols.append(col)
2325         col = "[%d,%d,%4d]%s" % (
2326             pp.tlen,
2327             pp.llen,
2328             pp.vlen,
2329             LENINDEF_PP_CHAR if pp.lenindef else " "
2330         )
2331         col = _colourize(col, "green", with_colours, ())
2332         cols.append(col)
2333     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2334     if decode_path_len > 0:
2335         cols.append(" ." * decode_path_len)
2336         ent = pp.decode_path[-1]
2337         if isinstance(ent, DecodePathDefBy):
2338             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2339             value = str(ent.defined_by)
2340             oid_name = None
2341             if (
2342                     len(oid_maps) > 0 and
2343                     ent.defined_by.asn1_type_name ==
2344                     ObjectIdentifier.asn1_type_name
2345             ):
2346                 for oid_map in oid_maps:
2347                     oid_name = oid_map.get(value)
2348                     if oid_name is not None:
2349                         cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2350                         break
2351             if oid_name is None:
2352                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2353         else:
2354             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2355     if pp.expl is not None:
2356         klass, _, num = pp.expl
2357         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2358         cols.append(_colourize(col, "blue", with_colours))
2359     if pp.impl is not None:
2360         klass, _, num = pp.impl
2361         col = "[%s%d]" % (TagClassReprs[klass], num)
2362         cols.append(_colourize(col, "blue", with_colours))
2363     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2364         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2365     if pp.ber_encoded:
2366         cols.append(_colourize("BER", "red", with_colours))
2367     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2368     if pp.value is not None:
2369         value = pp.value
2370         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2371         if (
2372                 len(oid_maps) > 0 and
2373                 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2374         ):
2375             for oid_map in oid_maps:
2376                 oid_name = oid_map.get(value)
2377                 if oid_name is not None:
2378                     cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2379                     break
2380         if pp.asn1_type_name == Integer.asn1_type_name:
2381             hex_repr = hex(int(pp.obj._value))[2:].upper()
2382             if len(hex_repr) % 2 != 0:
2383                 hex_repr = "0" + hex_repr
2384             cols.append(_colourize(
2385                 "(%s)" % colonize_hex(hex_repr),
2386                 "green",
2387                 with_colours,
2388             ))
2389     if with_blob:
2390         if pp.blob.__class__ == binary_type:
2391             cols.append(hexenc(pp.blob))
2392         elif pp.blob.__class__ == tuple:
2393             cols.append(", ".join(pp.blob))
2394     if pp.optional:
2395         cols.append(_colourize("OPTIONAL", "red", with_colours))
2396     if pp.default:
2397         cols.append(_colourize("DEFAULT", "red", with_colours))
2398     if with_decode_path:
2399         cols.append(_colourize(
2400             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2401             "grey",
2402             with_colours,
2403         ))
2404     return " ".join(cols)
2405
2406
2407 def pp_console_blob(pp, decode_path_len_decrease=0):
2408     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2409     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2410     if decode_path_len > 0:
2411         cols.append(" ." * (decode_path_len + 1))
2412     if pp.blob.__class__ == binary_type:
2413         blob = hexenc(pp.blob).upper()
2414         for i in six_xrange(0, len(blob), 32):
2415             chunk = blob[i:i + 32]
2416             yield " ".join(cols + [colonize_hex(chunk)])
2417     elif pp.blob.__class__ == tuple:
2418         yield " ".join(cols + [", ".join(pp.blob)])
2419
2420
2421 def pprint(
2422         obj,
2423         oid_maps=(),
2424         big_blobs=False,
2425         with_colours=False,
2426         with_decode_path=False,
2427         decode_path_only=(),
2428         decode_path=(),
2429 ):
2430     """Pretty print object
2431
2432     :param Obj obj: object you want to pretty print
2433     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2434                      Its human readable form is printed when OID is met
2435     :param big_blobs: if large binary objects are met (like OctetString
2436                       values), do we need to print them too, on separate
2437                       lines
2438     :param with_colours: colourize output, if ``termcolor`` library
2439                          is available
2440     :param with_decode_path: print decode path
2441     :param decode_path_only: print only that specified decode path
2442     """
2443     def _pprint_pps(pps):
2444         for pp in pps:
2445             if hasattr(pp, "_fields"):
2446                 if (
2447                         decode_path_only != () and
2448                         tuple(
2449                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2450                         ) != decode_path_only
2451                 ):
2452                     continue
2453                 if big_blobs:
2454                     yield pp_console_row(
2455                         pp,
2456                         oid_maps=oid_maps,
2457                         with_offsets=True,
2458                         with_blob=False,
2459                         with_colours=with_colours,
2460                         with_decode_path=with_decode_path,
2461                         decode_path_len_decrease=len(decode_path_only),
2462                     )
2463                     for row in pp_console_blob(
2464                             pp,
2465                             decode_path_len_decrease=len(decode_path_only),
2466                     ):
2467                         yield row
2468                 else:
2469                     yield pp_console_row(
2470                         pp,
2471                         oid_maps=oid_maps,
2472                         with_offsets=True,
2473                         with_blob=True,
2474                         with_colours=with_colours,
2475                         with_decode_path=with_decode_path,
2476                         decode_path_len_decrease=len(decode_path_only),
2477                     )
2478             else:
2479                 for row in _pprint_pps(pp):
2480                     yield row
2481     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2482
2483
2484 ########################################################################
2485 # ASN.1 primitive types
2486 ########################################################################
2487
2488 BooleanState = namedtuple(
2489     "BooleanState",
2490     BasicState._fields + ("value",),
2491     **NAMEDTUPLE_KWARGS
2492 )
2493
2494
2495 class Boolean(Obj):
2496     """``BOOLEAN`` boolean type
2497
2498     >>> b = Boolean(True)
2499     BOOLEAN True
2500     >>> b == Boolean(True)
2501     True
2502     >>> bool(b)
2503     True
2504     """
2505     __slots__ = ()
2506     tag_default = tag_encode(1)
2507     asn1_type_name = "BOOLEAN"
2508
2509     def __init__(
2510             self,
2511             value=None,
2512             impl=None,
2513             expl=None,
2514             default=None,
2515             optional=False,
2516             _decoded=(0, 0, 0),
2517     ):
2518         """
2519         :param value: set the value. Either boolean type, or
2520                       :py:class:`pyderasn.Boolean` object
2521         :param bytes impl: override default tag with ``IMPLICIT`` one
2522         :param bytes expl: override default tag with ``EXPLICIT`` one
2523         :param default: set default value. Type same as in ``value``
2524         :param bool optional: is object ``OPTIONAL`` in sequence
2525         """
2526         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2527         self._value = None if value is None else self._value_sanitize(value)
2528         if default is not None:
2529             default = self._value_sanitize(default)
2530             self.default = self.__class__(
2531                 value=default,
2532                 impl=self.tag,
2533                 expl=self._expl,
2534             )
2535             if value is None:
2536                 self._value = default
2537
2538     def _value_sanitize(self, value):
2539         if value.__class__ == bool:
2540             return value
2541         if issubclass(value.__class__, Boolean):
2542             return value._value
2543         raise InvalidValueType((self.__class__, bool))
2544
2545     @property
2546     def ready(self):
2547         return self._value is not None
2548
2549     def __getstate__(self):
2550         return BooleanState(
2551             __version__,
2552             self.tag,
2553             self._tag_order,
2554             self._expl,
2555             self.default,
2556             self.optional,
2557             self.offset,
2558             self.llen,
2559             self.vlen,
2560             self.expl_lenindef,
2561             self.lenindef,
2562             self.ber_encoded,
2563             self._value,
2564         )
2565
2566     def __setstate__(self, state):
2567         super(Boolean, self).__setstate__(state)
2568         self._value = state.value
2569
2570     def __nonzero__(self):
2571         self._assert_ready()
2572         return self._value
2573
2574     def __bool__(self):
2575         self._assert_ready()
2576         return self._value
2577
2578     def __eq__(self, their):
2579         if their.__class__ == bool:
2580             return self._value == their
2581         if not issubclass(their.__class__, Boolean):
2582             return False
2583         return (
2584             self._value == their._value and
2585             self.tag == their.tag and
2586             self._expl == their._expl
2587         )
2588
2589     def __call__(
2590             self,
2591             value=None,
2592             impl=None,
2593             expl=None,
2594             default=None,
2595             optional=None,
2596     ):
2597         return self.__class__(
2598             value=value,
2599             impl=self.tag if impl is None else impl,
2600             expl=self._expl if expl is None else expl,
2601             default=self.default if default is None else default,
2602             optional=self.optional if optional is None else optional,
2603         )
2604
2605     def _encode(self):
2606         self._assert_ready()
2607         return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2608
2609     def _encode1st(self, state):
2610         return len(self.tag) + 2, state
2611
2612     def _encode2nd(self, writer, state_iter):
2613         self._assert_ready()
2614         write_full(writer, self._encode())
2615
2616     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2617         try:
2618             t, _, lv = tag_strip(tlv)
2619         except DecodeError as err:
2620             raise err.__class__(
2621                 msg=err.msg,
2622                 klass=self.__class__,
2623                 decode_path=decode_path,
2624                 offset=offset,
2625             )
2626         if t != self.tag:
2627             raise TagMismatch(
2628                 klass=self.__class__,
2629                 decode_path=decode_path,
2630                 offset=offset,
2631             )
2632         if tag_only:
2633             yield None
2634             return
2635         try:
2636             l, _, v = len_decode(lv)
2637         except DecodeError as err:
2638             raise err.__class__(
2639                 msg=err.msg,
2640                 klass=self.__class__,
2641                 decode_path=decode_path,
2642                 offset=offset,
2643             )
2644         if l != 1:
2645             raise InvalidLength(
2646                 "Boolean's length must be equal to 1",
2647                 klass=self.__class__,
2648                 decode_path=decode_path,
2649                 offset=offset,
2650             )
2651         if l > len(v):
2652             raise NotEnoughData(
2653                 "encoded length is longer than data",
2654                 klass=self.__class__,
2655                 decode_path=decode_path,
2656                 offset=offset,
2657             )
2658         first_octet = byte2int(v)
2659         ber_encoded = False
2660         if first_octet == 0:
2661             value = False
2662         elif first_octet == 0xFF:
2663             value = True
2664         elif ctx.get("bered", False):
2665             value = True
2666             ber_encoded = True
2667         else:
2668             raise DecodeError(
2669                 "unacceptable Boolean value",
2670                 klass=self.__class__,
2671                 decode_path=decode_path,
2672                 offset=offset,
2673             )
2674         obj = self.__class__(
2675             value=value,
2676             impl=self.tag,
2677             expl=self._expl,
2678             default=self.default,
2679             optional=self.optional,
2680             _decoded=(offset, 1, 1),
2681         )
2682         obj.ber_encoded = ber_encoded
2683         yield decode_path, obj, v[1:]
2684
2685     def __repr__(self):
2686         return pp_console_row(next(self.pps()))
2687
2688     def pps(self, decode_path=()):
2689         yield _pp(
2690             obj=self,
2691             asn1_type_name=self.asn1_type_name,
2692             obj_name=self.__class__.__name__,
2693             decode_path=decode_path,
2694             value=str(self._value) if self.ready else None,
2695             optional=self.optional,
2696             default=self == self.default,
2697             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2698             expl=None if self._expl is None else tag_decode(self._expl),
2699             offset=self.offset,
2700             tlen=self.tlen,
2701             llen=self.llen,
2702             vlen=self.vlen,
2703             expl_offset=self.expl_offset if self.expled else None,
2704             expl_tlen=self.expl_tlen if self.expled else None,
2705             expl_llen=self.expl_llen if self.expled else None,
2706             expl_vlen=self.expl_vlen if self.expled else None,
2707             expl_lenindef=self.expl_lenindef,
2708             ber_encoded=self.ber_encoded,
2709             bered=self.bered,
2710         )
2711         for pp in self.pps_lenindef(decode_path):
2712             yield pp
2713
2714
2715 IntegerState = namedtuple(
2716     "IntegerState",
2717     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2718     **NAMEDTUPLE_KWARGS
2719 )
2720
2721
2722 class Integer(Obj):
2723     """``INTEGER`` integer type
2724
2725     >>> b = Integer(-123)
2726     INTEGER -123
2727     >>> b == Integer(-123)
2728     True
2729     >>> int(b)
2730     -123
2731
2732     >>> Integer(2, bounds=(1, 3))
2733     INTEGER 2
2734     >>> Integer(5, bounds=(1, 3))
2735     Traceback (most recent call last):
2736     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2737
2738     ::
2739
2740         class Version(Integer):
2741             schema = (
2742                 ("v1", 0),
2743                 ("v2", 1),
2744                 ("v3", 2),
2745             )
2746
2747     >>> v = Version("v1")
2748     Version INTEGER v1
2749     >>> int(v)
2750     0
2751     >>> v.named
2752     'v1'
2753     >>> v.specs
2754     {'v3': 2, 'v1': 0, 'v2': 1}
2755     """
2756     __slots__ = ("specs", "_bound_min", "_bound_max")
2757     tag_default = tag_encode(2)
2758     asn1_type_name = "INTEGER"
2759
2760     def __init__(
2761             self,
2762             value=None,
2763             bounds=None,
2764             impl=None,
2765             expl=None,
2766             default=None,
2767             optional=False,
2768             _specs=None,
2769             _decoded=(0, 0, 0),
2770     ):
2771         """
2772         :param value: set the value. Either integer type, named value
2773                       (if ``schema`` is specified in the class), or
2774                       :py:class:`pyderasn.Integer` object
2775         :param bounds: set ``(MIN, MAX)`` value constraint.
2776                        (-inf, +inf) by default
2777         :param bytes impl: override default tag with ``IMPLICIT`` one
2778         :param bytes expl: override default tag with ``EXPLICIT`` one
2779         :param default: set default value. Type same as in ``value``
2780         :param bool optional: is object ``OPTIONAL`` in sequence
2781         """
2782         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2783         self._value = value
2784         specs = getattr(self, "schema", {}) if _specs is None else _specs
2785         self.specs = specs if specs.__class__ == dict else dict(specs)
2786         self._bound_min, self._bound_max = getattr(
2787             self,
2788             "bounds",
2789             (float("-inf"), float("+inf")),
2790         ) if bounds is None else bounds
2791         if value is not None:
2792             self._value = self._value_sanitize(value)
2793         if default is not None:
2794             default = self._value_sanitize(default)
2795             self.default = self.__class__(
2796                 value=default,
2797                 impl=self.tag,
2798                 expl=self._expl,
2799                 _specs=self.specs,
2800             )
2801             if self._value is None:
2802                 self._value = default
2803
2804     def _value_sanitize(self, value):
2805         if isinstance(value, integer_types):
2806             pass
2807         elif issubclass(value.__class__, Integer):
2808             value = value._value
2809         elif value.__class__ == str:
2810             value = self.specs.get(value)
2811             if value is None:
2812                 raise ObjUnknown("integer value: %s" % value)
2813         else:
2814             raise InvalidValueType((self.__class__, int, str))
2815         if not self._bound_min <= value <= self._bound_max:
2816             raise BoundsError(self._bound_min, value, self._bound_max)
2817         return value
2818
2819     @property
2820     def ready(self):
2821         return self._value is not None
2822
2823     def __getstate__(self):
2824         return IntegerState(
2825             __version__,
2826             self.tag,
2827             self._tag_order,
2828             self._expl,
2829             self.default,
2830             self.optional,
2831             self.offset,
2832             self.llen,
2833             self.vlen,
2834             self.expl_lenindef,
2835             self.lenindef,
2836             self.ber_encoded,
2837             self.specs,
2838             self._value,
2839             self._bound_min,
2840             self._bound_max,
2841         )
2842
2843     def __setstate__(self, state):
2844         super(Integer, self).__setstate__(state)
2845         self.specs = state.specs
2846         self._value = state.value
2847         self._bound_min = state.bound_min
2848         self._bound_max = state.bound_max
2849
2850     def __int__(self):
2851         self._assert_ready()
2852         return int(self._value)
2853
2854     def __hash__(self):
2855         self._assert_ready()
2856         return hash(b"".join((
2857             self.tag,
2858             bytes(self._expl or b""),
2859             str(self._value).encode("ascii"),
2860         )))
2861
2862     def __eq__(self, their):
2863         if isinstance(their, integer_types):
2864             return self._value == their
2865         if not issubclass(their.__class__, Integer):
2866             return False
2867         return (
2868             self._value == their._value and
2869             self.tag == their.tag and
2870             self._expl == their._expl
2871         )
2872
2873     def __lt__(self, their):
2874         return self._value < their._value
2875
2876     @property
2877     def named(self):
2878         """Return named representation (if exists) of the value
2879         """
2880         for name, value in iteritems(self.specs):
2881             if value == self._value:
2882                 return name
2883         return None
2884
2885     def __call__(
2886             self,
2887             value=None,
2888             bounds=None,
2889             impl=None,
2890             expl=None,
2891             default=None,
2892             optional=None,
2893     ):
2894         return self.__class__(
2895             value=value,
2896             bounds=(
2897                 (self._bound_min, self._bound_max)
2898                 if bounds is None else bounds
2899             ),
2900             impl=self.tag if impl is None else impl,
2901             expl=self._expl if expl is None else expl,
2902             default=self.default if default is None else default,
2903             optional=self.optional if optional is None else optional,
2904             _specs=self.specs,
2905         )
2906
2907     def _encode_payload(self):
2908         self._assert_ready()
2909         value = self._value
2910         if PY2:
2911             if value == 0:
2912                 octets = bytearray([0])
2913             elif value < 0:
2914                 value = -value
2915                 value -= 1
2916                 octets = bytearray()
2917                 while value > 0:
2918                     octets.append((value & 0xFF) ^ 0xFF)
2919                     value >>= 8
2920                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2921                     octets.append(0xFF)
2922             else:
2923                 octets = bytearray()
2924                 while value > 0:
2925                     octets.append(value & 0xFF)
2926                     value >>= 8
2927                 if octets[-1] & 0x80 > 0:
2928                     octets.append(0x00)
2929             octets.reverse()
2930             octets = bytes(octets)
2931         else:
2932             bytes_len = ceil(value.bit_length() / 8) or 1
2933             while True:
2934                 try:
2935                     octets = value.to_bytes(
2936                         bytes_len,
2937                         byteorder="big",
2938                         signed=True,
2939                     )
2940                 except OverflowError:
2941                     bytes_len += 1
2942                 else:
2943                     break
2944         return octets
2945
2946     def _encode(self):
2947         octets = self._encode_payload()
2948         return b"".join((self.tag, len_encode(len(octets)), octets))
2949
2950     def _encode1st(self, state):
2951         l = len(self._encode_payload())
2952         return len(self.tag) + len_size(l) + l, state
2953
2954     def _encode2nd(self, writer, state_iter):
2955         write_full(writer, self._encode())
2956
2957     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2958         try:
2959             t, _, lv = tag_strip(tlv)
2960         except DecodeError as err:
2961             raise err.__class__(
2962                 msg=err.msg,
2963                 klass=self.__class__,
2964                 decode_path=decode_path,
2965                 offset=offset,
2966             )
2967         if t != self.tag:
2968             raise TagMismatch(
2969                 klass=self.__class__,
2970                 decode_path=decode_path,
2971                 offset=offset,
2972             )
2973         if tag_only:
2974             yield None
2975             return
2976         try:
2977             l, llen, v = len_decode(lv)
2978         except DecodeError as err:
2979             raise err.__class__(
2980                 msg=err.msg,
2981                 klass=self.__class__,
2982                 decode_path=decode_path,
2983                 offset=offset,
2984             )
2985         if l > len(v):
2986             raise NotEnoughData(
2987                 "encoded length is longer than data",
2988                 klass=self.__class__,
2989                 decode_path=decode_path,
2990                 offset=offset,
2991             )
2992         if l == 0:
2993             raise NotEnoughData(
2994                 "zero length",
2995                 klass=self.__class__,
2996                 decode_path=decode_path,
2997                 offset=offset,
2998             )
2999         v, tail = v[:l], v[l:]
3000         first_octet = byte2int(v)
3001         if l > 1:
3002             second_octet = byte2int(v[1:])
3003             if (
3004                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3005                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3006             ):
3007                 raise DecodeError(
3008                     "non normalized integer",
3009                     klass=self.__class__,
3010                     decode_path=decode_path,
3011                     offset=offset,
3012                 )
3013         if PY2:
3014             value = 0
3015             if first_octet & 0x80 > 0:
3016                 octets = bytearray()
3017                 for octet in bytearray(v):
3018                     octets.append(octet ^ 0xFF)
3019                 for octet in octets:
3020                     value = (value << 8) | octet
3021                 value += 1
3022                 value = -value
3023             else:
3024                 for octet in bytearray(v):
3025                     value = (value << 8) | octet
3026         else:
3027             value = int.from_bytes(v, byteorder="big", signed=True)
3028         try:
3029             obj = self.__class__(
3030                 value=value,
3031                 bounds=(self._bound_min, self._bound_max),
3032                 impl=self.tag,
3033                 expl=self._expl,
3034                 default=self.default,
3035                 optional=self.optional,
3036                 _specs=self.specs,
3037                 _decoded=(offset, llen, l),
3038             )
3039         except BoundsError as err:
3040             raise DecodeError(
3041                 msg=str(err),
3042                 klass=self.__class__,
3043                 decode_path=decode_path,
3044                 offset=offset,
3045             )
3046         yield decode_path, obj, tail
3047
3048     def __repr__(self):
3049         return pp_console_row(next(self.pps()))
3050
3051     def pps(self, decode_path=()):
3052         yield _pp(
3053             obj=self,
3054             asn1_type_name=self.asn1_type_name,
3055             obj_name=self.__class__.__name__,
3056             decode_path=decode_path,
3057             value=(self.named or str(self._value)) if self.ready else None,
3058             optional=self.optional,
3059             default=self == self.default,
3060             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3061             expl=None if self._expl is None else tag_decode(self._expl),
3062             offset=self.offset,
3063             tlen=self.tlen,
3064             llen=self.llen,
3065             vlen=self.vlen,
3066             expl_offset=self.expl_offset if self.expled else None,
3067             expl_tlen=self.expl_tlen if self.expled else None,
3068             expl_llen=self.expl_llen if self.expled else None,
3069             expl_vlen=self.expl_vlen if self.expled else None,
3070             expl_lenindef=self.expl_lenindef,
3071             bered=self.bered,
3072         )
3073         for pp in self.pps_lenindef(decode_path):
3074             yield pp
3075
3076
3077 BitStringState = namedtuple(
3078     "BitStringState",
3079     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3080     **NAMEDTUPLE_KWARGS
3081 )
3082
3083
3084 class BitString(Obj):
3085     """``BIT STRING`` bit string type
3086
3087     >>> BitString(b"hello world")
3088     BIT STRING 88 bits 68656c6c6f20776f726c64
3089     >>> bytes(b)
3090     b'hello world'
3091     >>> b == b"hello world"
3092     True
3093     >>> b.bit_len
3094     88
3095
3096     >>> BitString("'0A3B5F291CD'H")
3097     BIT STRING 44 bits 0a3b5f291cd0
3098     >>> b = BitString("'010110000000'B")
3099     BIT STRING 12 bits 5800
3100     >>> b.bit_len
3101     12
3102     >>> b[0], b[1], b[2], b[3]
3103     (False, True, False, True)
3104     >>> b[1000]
3105     False
3106     >>> [v for v in b]
3107     [False, True, False, True, True, False, False, False, False, False, False, False]
3108
3109     ::
3110
3111         class KeyUsage(BitString):
3112             schema = (
3113                 ("digitalSignature", 0),
3114                 ("nonRepudiation", 1),
3115                 ("keyEncipherment", 2),
3116             )
3117
3118     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3119     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3120     >>> b.named
3121     ['nonRepudiation', 'keyEncipherment']
3122     >>> b.specs
3123     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3124
3125     .. note::
3126
3127        Pay attention that BIT STRING can be encoded both in primitive
3128        and constructed forms. Decoder always checks constructed form tag
3129        additionally to specified primitive one. If BER decoding is
3130        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3131        of DER restrictions.
3132     """
3133     __slots__ = ("tag_constructed", "specs", "defined")
3134     tag_default = tag_encode(3)
3135     asn1_type_name = "BIT STRING"
3136
3137     def __init__(
3138             self,
3139             value=None,
3140             impl=None,
3141             expl=None,
3142             default=None,
3143             optional=False,
3144             _specs=None,
3145             _decoded=(0, 0, 0),
3146     ):
3147         """
3148         :param value: set the value. Either binary type, tuple of named
3149                       values (if ``schema`` is specified in the class),
3150                       string in ``'XXX...'B`` form, or
3151                       :py:class:`pyderasn.BitString` object
3152         :param bytes impl: override default tag with ``IMPLICIT`` one
3153         :param bytes expl: override default tag with ``EXPLICIT`` one
3154         :param default: set default value. Type same as in ``value``
3155         :param bool optional: is object ``OPTIONAL`` in sequence
3156         """
3157         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3158         specs = getattr(self, "schema", {}) if _specs is None else _specs
3159         self.specs = specs if specs.__class__ == dict else dict(specs)
3160         self._value = None if value is None else self._value_sanitize(value)
3161         if default is not None:
3162             default = self._value_sanitize(default)
3163             self.default = self.__class__(
3164                 value=default,
3165                 impl=self.tag,
3166                 expl=self._expl,
3167             )
3168             if value is None:
3169                 self._value = default
3170         self.defined = None
3171         tag_klass, _, tag_num = tag_decode(self.tag)
3172         self.tag_constructed = tag_encode(
3173             klass=tag_klass,
3174             form=TagFormConstructed,
3175             num=tag_num,
3176         )
3177
3178     def _bits2octets(self, bits):
3179         if len(self.specs) > 0:
3180             bits = bits.rstrip("0")
3181         bit_len = len(bits)
3182         bits += "0" * ((8 - (bit_len % 8)) % 8)
3183         octets = bytearray(len(bits) // 8)
3184         for i in six_xrange(len(octets)):
3185             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3186         return bit_len, bytes(octets)
3187
3188     def _value_sanitize(self, value):
3189         if isinstance(value, (string_types, binary_type)):
3190             if (
3191                     isinstance(value, string_types) and
3192                     value.startswith("'")
3193             ):
3194                 if value.endswith("'B"):
3195                     value = value[1:-2]
3196                     if not frozenset(value) <= SET01:
3197                         raise ValueError("B's coding contains unacceptable chars")
3198                     return self._bits2octets(value)
3199                 if value.endswith("'H"):
3200                     value = value[1:-2]
3201                     return (
3202                         len(value) * 4,
3203                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3204                     )
3205             if value.__class__ == binary_type:
3206                 return (len(value) * 8, value)
3207             raise InvalidValueType((self.__class__, string_types, binary_type))
3208         if value.__class__ == tuple:
3209             if (
3210                     len(value) == 2 and
3211                     isinstance(value[0], integer_types) and
3212                     value[1].__class__ == binary_type
3213             ):
3214                 return value
3215             bits = []
3216             for name in value:
3217                 bit = self.specs.get(name)
3218                 if bit is None:
3219                     raise ObjUnknown("BitString value: %s" % name)
3220                 bits.append(bit)
3221             if len(bits) == 0:
3222                 return self._bits2octets("")
3223             bits = frozenset(bits)
3224             return self._bits2octets("".join(
3225                 ("1" if bit in bits else "0")
3226                 for bit in six_xrange(max(bits) + 1)
3227             ))
3228         if issubclass(value.__class__, BitString):
3229             return value._value
3230         raise InvalidValueType((self.__class__, binary_type, string_types))
3231
3232     @property
3233     def ready(self):
3234         return self._value is not None
3235
3236     def __getstate__(self):
3237         return BitStringState(
3238             __version__,
3239             self.tag,
3240             self._tag_order,
3241             self._expl,
3242             self.default,
3243             self.optional,
3244             self.offset,
3245             self.llen,
3246             self.vlen,
3247             self.expl_lenindef,
3248             self.lenindef,
3249             self.ber_encoded,
3250             self.specs,
3251             self._value,
3252             self.tag_constructed,
3253             self.defined,
3254         )
3255
3256     def __setstate__(self, state):
3257         super(BitString, self).__setstate__(state)
3258         self.specs = state.specs
3259         self._value = state.value
3260         self.tag_constructed = state.tag_constructed
3261         self.defined = state.defined
3262
3263     def __iter__(self):
3264         self._assert_ready()
3265         for i in six_xrange(self._value[0]):
3266             yield self[i]
3267
3268     @property
3269     def bit_len(self):
3270         """Returns number of bits in the string
3271         """
3272         self._assert_ready()
3273         return self._value[0]
3274
3275     def __bytes__(self):
3276         self._assert_ready()
3277         return self._value[1]
3278
3279     def __eq__(self, their):
3280         if their.__class__ == bytes:
3281             return self._value[1] == their
3282         if not issubclass(their.__class__, BitString):
3283             return False
3284         return (
3285             self._value == their._value and
3286             self.tag == their.tag and
3287             self._expl == their._expl
3288         )
3289
3290     @property
3291     def named(self):
3292         """Named representation (if exists) of the bits
3293
3294         :returns: [str(name), ...]
3295         """
3296         return [name for name, bit in iteritems(self.specs) if self[bit]]
3297
3298     def __call__(
3299             self,
3300             value=None,
3301             impl=None,
3302             expl=None,
3303             default=None,
3304             optional=None,
3305     ):
3306         return self.__class__(
3307             value=value,
3308             impl=self.tag if impl is None else impl,
3309             expl=self._expl if expl is None else expl,
3310             default=self.default if default is None else default,
3311             optional=self.optional if optional is None else optional,
3312             _specs=self.specs,
3313         )
3314
3315     def __getitem__(self, key):
3316         if key.__class__ == int:
3317             bit_len, octets = self._value
3318             if key >= bit_len:
3319                 return False
3320             return (
3321                 byte2int(memoryview(octets)[key // 8:]) >>
3322                 (7 - (key % 8))
3323             ) & 1 == 1
3324         if isinstance(key, string_types):
3325             value = self.specs.get(key)
3326             if value is None:
3327                 raise ObjUnknown("BitString value: %s" % key)
3328             return self[value]
3329         raise InvalidValueType((int, str))
3330
3331     def _encode(self):
3332         self._assert_ready()
3333         bit_len, octets = self._value
3334         return b"".join((
3335             self.tag,
3336             len_encode(len(octets) + 1),
3337             int2byte((8 - bit_len % 8) % 8),
3338             octets,
3339         ))
3340
3341     def _encode1st(self, state):
3342         self._assert_ready()
3343         _, octets = self._value
3344         l = len(octets) + 1
3345         return len(self.tag) + len_size(l) + l, state
3346
3347     def _encode2nd(self, writer, state_iter):
3348         bit_len, octets = self._value
3349         write_full(writer, b"".join((
3350             self.tag,
3351             len_encode(len(octets) + 1),
3352             int2byte((8 - bit_len % 8) % 8),
3353         )))
3354         write_full(writer, octets)
3355
3356     def _encode_cer(self, writer):
3357         bit_len, octets = self._value
3358         if len(octets) + 1 <= 1000:
3359             write_full(writer, self._encode())
3360             return
3361         write_full(writer, self.tag_constructed)
3362         write_full(writer, LENINDEF)
3363         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3364             write_full(writer, b"".join((
3365                 BitString.tag_default,
3366                 LEN1K,
3367                 int2byte(0),
3368                 octets[offset:offset + 999],
3369             )))
3370         tail = octets[offset+999:]
3371         if len(tail) > 0:
3372             tail = int2byte((8 - bit_len % 8) % 8) + tail
3373             write_full(writer, b"".join((
3374                 BitString.tag_default,
3375                 len_encode(len(tail)),
3376                 tail,
3377             )))
3378         write_full(writer, EOC)
3379
3380     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3381         try:
3382             t, tlen, lv = tag_strip(tlv)
3383         except DecodeError as err:
3384             raise err.__class__(
3385                 msg=err.msg,
3386                 klass=self.__class__,
3387                 decode_path=decode_path,
3388                 offset=offset,
3389             )
3390         if t == self.tag:
3391             if tag_only:  # pragma: no cover
3392                 yield None
3393                 return
3394             try:
3395                 l, llen, v = len_decode(lv)
3396             except DecodeError as err:
3397                 raise err.__class__(
3398                     msg=err.msg,
3399                     klass=self.__class__,
3400                     decode_path=decode_path,
3401                     offset=offset,
3402                 )
3403             if l > len(v):
3404                 raise NotEnoughData(
3405                     "encoded length is longer than data",
3406                     klass=self.__class__,
3407                     decode_path=decode_path,
3408                     offset=offset,
3409                 )
3410             if l == 0:
3411                 raise NotEnoughData(
3412                     "zero length",
3413                     klass=self.__class__,
3414                     decode_path=decode_path,
3415                     offset=offset,
3416                 )
3417             pad_size = byte2int(v)
3418             if l == 1 and pad_size != 0:
3419                 raise DecodeError(
3420                     "invalid empty value",
3421                     klass=self.__class__,
3422                     decode_path=decode_path,
3423                     offset=offset,
3424                 )
3425             if pad_size > 7:
3426                 raise DecodeError(
3427                     "too big pad",
3428                     klass=self.__class__,
3429                     decode_path=decode_path,
3430                     offset=offset,
3431                 )
3432             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3433                 raise DecodeError(
3434                     "invalid pad",
3435                     klass=self.__class__,
3436                     decode_path=decode_path,
3437                     offset=offset,
3438                 )
3439             v, tail = v[:l], v[l:]
3440             bit_len = (len(v) - 1) * 8 - pad_size
3441             obj = self.__class__(
3442                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3443                 impl=self.tag,
3444                 expl=self._expl,
3445                 default=self.default,
3446                 optional=self.optional,
3447                 _specs=self.specs,
3448                 _decoded=(offset, llen, l),
3449             )
3450             if evgen_mode:
3451                 obj._value = (bit_len, None)
3452             yield decode_path, obj, tail
3453             return
3454         if t != self.tag_constructed:
3455             raise TagMismatch(
3456                 klass=self.__class__,
3457                 decode_path=decode_path,
3458                 offset=offset,
3459             )
3460         if not ctx.get("bered", False):
3461             raise DecodeError(
3462                 "unallowed BER constructed encoding",
3463                 klass=self.__class__,
3464                 decode_path=decode_path,
3465                 offset=offset,
3466             )
3467         if tag_only:  # pragma: no cover
3468             yield None
3469             return
3470         lenindef = False
3471         try:
3472             l, llen, v = len_decode(lv)
3473         except LenIndefForm:
3474             llen, l, v = 1, 0, lv[1:]
3475             lenindef = True
3476         except DecodeError as err:
3477             raise err.__class__(
3478                 msg=err.msg,
3479                 klass=self.__class__,
3480                 decode_path=decode_path,
3481                 offset=offset,
3482             )
3483         if l > len(v):
3484             raise NotEnoughData(
3485                 "encoded length is longer than data",
3486                 klass=self.__class__,
3487                 decode_path=decode_path,
3488                 offset=offset,
3489             )
3490         if not lenindef and l == 0:
3491             raise NotEnoughData(
3492                 "zero length",
3493                 klass=self.__class__,
3494                 decode_path=decode_path,
3495                 offset=offset,
3496             )
3497         chunks = []
3498         sub_offset = offset + tlen + llen
3499         vlen = 0
3500         while True:
3501             if lenindef:
3502                 if v[:EOC_LEN].tobytes() == EOC:
3503                     break
3504             else:
3505                 if vlen == l:
3506                     break
3507                 if vlen > l:
3508                     raise DecodeError(
3509                         "chunk out of bounds",
3510                         klass=self.__class__,
3511                         decode_path=decode_path + (str(len(chunks) - 1),),
3512                         offset=chunks[-1].offset,
3513                     )
3514             sub_decode_path = decode_path + (str(len(chunks)),)
3515             try:
3516                 if evgen_mode:
3517                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3518                             v,
3519                             offset=sub_offset,
3520                             decode_path=sub_decode_path,
3521                             leavemm=True,
3522                             ctx=ctx,
3523                             _ctx_immutable=False,
3524                     ):
3525                         yield _decode_path, chunk, v_tail
3526                 else:
3527                     _, chunk, v_tail = next(BitString().decode_evgen(
3528                         v,
3529                         offset=sub_offset,
3530                         decode_path=sub_decode_path,
3531                         leavemm=True,
3532                         ctx=ctx,
3533                         _ctx_immutable=False,
3534                         _evgen_mode=False,
3535                     ))
3536             except TagMismatch:
3537                 raise DecodeError(
3538                     "expected BitString encoded chunk",
3539                     klass=self.__class__,
3540                     decode_path=sub_decode_path,
3541                     offset=sub_offset,
3542                 )
3543             chunks.append(chunk)
3544             sub_offset += chunk.tlvlen
3545             vlen += chunk.tlvlen
3546             v = v_tail
3547         if len(chunks) == 0:
3548             raise DecodeError(
3549                 "no chunks",
3550                 klass=self.__class__,
3551                 decode_path=decode_path,
3552                 offset=offset,
3553             )
3554         values = []
3555         bit_len = 0
3556         for chunk_i, chunk in enumerate(chunks[:-1]):
3557             if chunk.bit_len % 8 != 0:
3558                 raise DecodeError(
3559                     "BitString chunk is not multiple of 8 bits",
3560                     klass=self.__class__,
3561                     decode_path=decode_path + (str(chunk_i),),
3562                     offset=chunk.offset,
3563                 )
3564             if not evgen_mode:
3565                 values.append(bytes(chunk))
3566             bit_len += chunk.bit_len
3567         chunk_last = chunks[-1]
3568         if not evgen_mode:
3569             values.append(bytes(chunk_last))
3570         bit_len += chunk_last.bit_len
3571         obj = self.__class__(
3572             value=None if evgen_mode else (bit_len, b"".join(values)),
3573             impl=self.tag,
3574             expl=self._expl,
3575             default=self.default,
3576             optional=self.optional,
3577             _specs=self.specs,
3578             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3579         )
3580         if evgen_mode:
3581             obj._value = (bit_len, None)
3582         obj.lenindef = lenindef
3583         obj.ber_encoded = True
3584         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3585
3586     def __repr__(self):
3587         return pp_console_row(next(self.pps()))
3588
3589     def pps(self, decode_path=()):
3590         value = None
3591         blob = None
3592         if self.ready:
3593             bit_len, blob = self._value
3594             value = "%d bits" % bit_len
3595             if len(self.specs) > 0 and blob is not None:
3596                 blob = tuple(self.named)
3597         yield _pp(
3598             obj=self,
3599             asn1_type_name=self.asn1_type_name,
3600             obj_name=self.__class__.__name__,
3601             decode_path=decode_path,
3602             value=value,
3603             blob=blob,
3604             optional=self.optional,
3605             default=self == self.default,
3606             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3607             expl=None if self._expl is None else tag_decode(self._expl),
3608             offset=self.offset,
3609             tlen=self.tlen,
3610             llen=self.llen,
3611             vlen=self.vlen,
3612             expl_offset=self.expl_offset if self.expled else None,
3613             expl_tlen=self.expl_tlen if self.expled else None,
3614             expl_llen=self.expl_llen if self.expled else None,
3615             expl_vlen=self.expl_vlen if self.expled else None,
3616             expl_lenindef=self.expl_lenindef,
3617             lenindef=self.lenindef,
3618             ber_encoded=self.ber_encoded,
3619             bered=self.bered,
3620         )
3621         defined_by, defined = self.defined or (None, None)
3622         if defined_by is not None:
3623             yield defined.pps(
3624                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3625             )
3626         for pp in self.pps_lenindef(decode_path):
3627             yield pp
3628
3629
3630 OctetStringState = namedtuple(
3631     "OctetStringState",
3632     BasicState._fields + (
3633         "value",
3634         "bound_min",
3635         "bound_max",
3636         "tag_constructed",
3637         "defined",
3638     ),
3639     **NAMEDTUPLE_KWARGS
3640 )
3641
3642
3643 class OctetString(Obj):
3644     """``OCTET STRING`` binary string type
3645
3646     >>> s = OctetString(b"hello world")
3647     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3648     >>> s == OctetString(b"hello world")
3649     True
3650     >>> bytes(s)
3651     b'hello world'
3652
3653     >>> OctetString(b"hello", bounds=(4, 4))
3654     Traceback (most recent call last):
3655     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3656     >>> OctetString(b"hell", bounds=(4, 4))
3657     OCTET STRING 4 bytes 68656c6c
3658
3659     Memoryviews can be used as a values. If memoryview is made on
3660     mmap-ed file, then it does not take storage inside OctetString
3661     itself. In CER encoding mode it will be streamed to the specified
3662     writer, copying 1 KB chunks.
3663     """
3664     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3665     tag_default = tag_encode(4)
3666     asn1_type_name = "OCTET STRING"
3667     evgen_mode_skip_value = True
3668
3669     def __init__(
3670             self,
3671             value=None,
3672             bounds=None,
3673             impl=None,
3674             expl=None,
3675             default=None,
3676             optional=False,
3677             _decoded=(0, 0, 0),
3678             ctx=None,
3679     ):
3680         """
3681         :param value: set the value. Either binary type, or
3682                       :py:class:`pyderasn.OctetString` object
3683         :param bounds: set ``(MIN, MAX)`` value size constraint.
3684                        (-inf, +inf) by default
3685         :param bytes impl: override default tag with ``IMPLICIT`` one
3686         :param bytes expl: override default tag with ``EXPLICIT`` one
3687         :param default: set default value. Type same as in ``value``
3688         :param bool optional: is object ``OPTIONAL`` in sequence
3689         """
3690         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3691         self._value = value
3692         self._bound_min, self._bound_max = getattr(
3693             self,
3694             "bounds",
3695             (0, float("+inf")),
3696         ) if bounds is None else bounds
3697         if value is not None:
3698             self._value = self._value_sanitize(value)
3699         if default is not None:
3700             default = self._value_sanitize(default)
3701             self.default = self.__class__(
3702                 value=default,
3703                 impl=self.tag,
3704                 expl=self._expl,
3705             )
3706             if self._value is None:
3707                 self._value = default
3708         self.defined = None
3709         tag_klass, _, tag_num = tag_decode(self.tag)
3710         self.tag_constructed = tag_encode(
3711             klass=tag_klass,
3712             form=TagFormConstructed,
3713             num=tag_num,
3714         )
3715
3716     def _value_sanitize(self, value):
3717         if value.__class__ == binary_type or value.__class__ == memoryview:
3718             pass
3719         elif issubclass(value.__class__, OctetString):
3720             value = value._value
3721         else:
3722             raise InvalidValueType((self.__class__, bytes, memoryview))
3723         if not self._bound_min <= len(value) <= self._bound_max:
3724             raise BoundsError(self._bound_min, len(value), self._bound_max)
3725         return value
3726
3727     @property
3728     def ready(self):
3729         return self._value is not None
3730
3731     def __getstate__(self):
3732         return OctetStringState(
3733             __version__,
3734             self.tag,
3735             self._tag_order,
3736             self._expl,
3737             self.default,
3738             self.optional,
3739             self.offset,
3740             self.llen,
3741             self.vlen,
3742             self.expl_lenindef,
3743             self.lenindef,
3744             self.ber_encoded,
3745             self._value,
3746             self._bound_min,
3747             self._bound_max,
3748             self.tag_constructed,
3749             self.defined,
3750         )
3751
3752     def __setstate__(self, state):
3753         super(OctetString, self).__setstate__(state)
3754         self._value = state.value
3755         self._bound_min = state.bound_min
3756         self._bound_max = state.bound_max
3757         self.tag_constructed = state.tag_constructed
3758         self.defined = state.defined
3759
3760     def __bytes__(self):
3761         self._assert_ready()
3762         return bytes(self._value)
3763
3764     def __eq__(self, their):
3765         if their.__class__ == binary_type:
3766             return self._value == their
3767         if not issubclass(their.__class__, OctetString):
3768             return False
3769         return (
3770             self._value == their._value and
3771             self.tag == their.tag and
3772             self._expl == their._expl
3773         )
3774
3775     def __lt__(self, their):
3776         return self._value < their._value
3777
3778     def __call__(
3779             self,
3780             value=None,
3781             bounds=None,
3782             impl=None,
3783             expl=None,
3784             default=None,
3785             optional=None,
3786     ):
3787         return self.__class__(
3788             value=value,
3789             bounds=(
3790                 (self._bound_min, self._bound_max)
3791                 if bounds is None else bounds
3792             ),
3793             impl=self.tag if impl is None else impl,
3794             expl=self._expl if expl is None else expl,
3795             default=self.default if default is None else default,
3796             optional=self.optional if optional is None else optional,
3797         )
3798
3799     def _encode(self):
3800         self._assert_ready()
3801         return b"".join((
3802             self.tag,
3803             len_encode(len(self._value)),
3804             self._value,
3805         ))
3806
3807     def _encode1st(self, state):
3808         self._assert_ready()
3809         l = len(self._value)
3810         return len(self.tag) + len_size(l) + l, state
3811
3812     def _encode2nd(self, writer, state_iter):
3813         value = self._value
3814         write_full(writer, self.tag + len_encode(len(value)))
3815         write_full(writer, value)
3816
3817     def _encode_cer(self, writer):
3818         octets = self._value
3819         if len(octets) <= 1000:
3820             write_full(writer, self._encode())
3821             return
3822         write_full(writer, self.tag_constructed)
3823         write_full(writer, LENINDEF)
3824         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3825             write_full(writer, b"".join((
3826                 OctetString.tag_default,
3827                 LEN1K,
3828                 octets[offset:offset + 1000],
3829             )))
3830         tail = octets[offset+1000:]
3831         if len(tail) > 0:
3832             write_full(writer, b"".join((
3833                 OctetString.tag_default,
3834                 len_encode(len(tail)),
3835                 tail,
3836             )))
3837         write_full(writer, EOC)
3838
3839     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3840         try:
3841             t, tlen, lv = tag_strip(tlv)
3842         except DecodeError as err:
3843             raise err.__class__(
3844                 msg=err.msg,
3845                 klass=self.__class__,
3846                 decode_path=decode_path,
3847                 offset=offset,
3848             )
3849         if t == self.tag:
3850             if tag_only:
3851                 yield None
3852                 return
3853             try:
3854                 l, llen, v = len_decode(lv)
3855             except DecodeError as err:
3856                 raise err.__class__(
3857                     msg=err.msg,
3858                     klass=self.__class__,
3859                     decode_path=decode_path,
3860                     offset=offset,
3861                 )
3862             if l > len(v):
3863                 raise NotEnoughData(
3864                     "encoded length is longer than data",
3865                     klass=self.__class__,
3866                     decode_path=decode_path,
3867                     offset=offset,
3868                 )
3869             v, tail = v[:l], v[l:]
3870             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3871                 raise DecodeError(
3872                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3873                     klass=self.__class__,
3874                     decode_path=decode_path,
3875                     offset=offset,
3876                 )
3877             try:
3878                 obj = self.__class__(
3879                     value=(
3880                         None if (evgen_mode and self.evgen_mode_skip_value)
3881                         else v.tobytes()
3882                     ),
3883                     bounds=(self._bound_min, self._bound_max),
3884                     impl=self.tag,
3885                     expl=self._expl,
3886                     default=self.default,
3887                     optional=self.optional,
3888                     _decoded=(offset, llen, l),
3889                     ctx=ctx,
3890                 )
3891             except DecodeError as err:
3892                 raise DecodeError(
3893                     msg=err.msg,
3894                     klass=self.__class__,
3895                     decode_path=decode_path,
3896                     offset=offset,
3897                 )
3898             except BoundsError as err:
3899                 raise DecodeError(
3900                     msg=str(err),
3901                     klass=self.__class__,
3902                     decode_path=decode_path,
3903                     offset=offset,
3904                 )
3905             yield decode_path, obj, tail
3906             return
3907         if t != self.tag_constructed:
3908             raise TagMismatch(
3909                 klass=self.__class__,
3910                 decode_path=decode_path,
3911                 offset=offset,
3912             )
3913         if not ctx.get("bered", False):
3914             raise DecodeError(
3915                 "unallowed BER constructed encoding",
3916                 klass=self.__class__,
3917                 decode_path=decode_path,
3918                 offset=offset,
3919             )
3920         if tag_only:
3921             yield None
3922             return
3923         lenindef = False
3924         try:
3925             l, llen, v = len_decode(lv)
3926         except LenIndefForm:
3927             llen, l, v = 1, 0, lv[1:]
3928             lenindef = True
3929         except DecodeError as err:
3930             raise err.__class__(
3931                 msg=err.msg,
3932                 klass=self.__class__,
3933                 decode_path=decode_path,
3934                 offset=offset,
3935             )
3936         if l > len(v):
3937             raise NotEnoughData(
3938                 "encoded length is longer than data",
3939                 klass=self.__class__,
3940                 decode_path=decode_path,
3941                 offset=offset,
3942             )
3943         chunks = []
3944         chunks_count = 0
3945         sub_offset = offset + tlen + llen
3946         vlen = 0
3947         payload_len = 0
3948         while True:
3949             if lenindef:
3950                 if v[:EOC_LEN].tobytes() == EOC:
3951                     break
3952             else:
3953                 if vlen == l:
3954                     break
3955                 if vlen > l:
3956                     raise DecodeError(
3957                         "chunk out of bounds",
3958                         klass=self.__class__,
3959                         decode_path=decode_path + (str(len(chunks) - 1),),
3960                         offset=chunks[-1].offset,
3961                     )
3962             try:
3963                 if evgen_mode:
3964                     sub_decode_path = decode_path + (str(chunks_count),)
3965                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3966                             v,
3967                             offset=sub_offset,
3968                             decode_path=sub_decode_path,
3969                             leavemm=True,
3970                             ctx=ctx,
3971                             _ctx_immutable=False,
3972                     ):
3973                         yield _decode_path, chunk, v_tail
3974                         if not chunk.ber_encoded:
3975                             payload_len += chunk.vlen
3976                     chunks_count += 1
3977                 else:
3978                     sub_decode_path = decode_path + (str(len(chunks)),)
3979                     _, chunk, v_tail = next(OctetString().decode_evgen(
3980                         v,
3981                         offset=sub_offset,
3982                         decode_path=sub_decode_path,
3983                         leavemm=True,
3984                         ctx=ctx,
3985                         _ctx_immutable=False,
3986                         _evgen_mode=False,
3987                     ))
3988                     chunks.append(chunk)
3989             except TagMismatch:
3990                 raise DecodeError(
3991                     "expected OctetString encoded chunk",
3992                     klass=self.__class__,
3993                     decode_path=sub_decode_path,
3994                     offset=sub_offset,
3995                 )
3996             sub_offset += chunk.tlvlen
3997             vlen += chunk.tlvlen
3998             v = v_tail
3999         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4000             raise DecodeError(
4001                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4002                 klass=self.__class__,
4003                 decode_path=decode_path,
4004                 offset=offset,
4005             )
4006         try:
4007             obj = self.__class__(
4008                 value=(
4009                     None if evgen_mode else
4010                     b"".join(bytes(chunk) for chunk in chunks)
4011                 ),
4012                 bounds=(self._bound_min, self._bound_max),
4013                 impl=self.tag,
4014                 expl=self._expl,
4015                 default=self.default,
4016                 optional=self.optional,
4017                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4018                 ctx=ctx,
4019             )
4020         except DecodeError as err:
4021             raise DecodeError(
4022                 msg=err.msg,
4023                 klass=self.__class__,
4024                 decode_path=decode_path,
4025                 offset=offset,
4026             )
4027         except BoundsError as err:
4028             raise DecodeError(
4029                 msg=str(err),
4030                 klass=self.__class__,
4031                 decode_path=decode_path,
4032                 offset=offset,
4033             )
4034         obj.lenindef = lenindef
4035         obj.ber_encoded = True
4036         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4037
4038     def __repr__(self):
4039         return pp_console_row(next(self.pps()))
4040
4041     def pps(self, decode_path=()):
4042         yield _pp(
4043             obj=self,
4044             asn1_type_name=self.asn1_type_name,
4045             obj_name=self.__class__.__name__,
4046             decode_path=decode_path,
4047             value=("%d bytes" % len(self._value)) if self.ready else None,
4048             blob=self._value if self.ready else None,
4049             optional=self.optional,
4050             default=self == self.default,
4051             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4052             expl=None if self._expl is None else tag_decode(self._expl),
4053             offset=self.offset,
4054             tlen=self.tlen,
4055             llen=self.llen,
4056             vlen=self.vlen,
4057             expl_offset=self.expl_offset if self.expled else None,
4058             expl_tlen=self.expl_tlen if self.expled else None,
4059             expl_llen=self.expl_llen if self.expled else None,
4060             expl_vlen=self.expl_vlen if self.expled else None,
4061             expl_lenindef=self.expl_lenindef,
4062             lenindef=self.lenindef,
4063             ber_encoded=self.ber_encoded,
4064             bered=self.bered,
4065         )
4066         defined_by, defined = self.defined or (None, None)
4067         if defined_by is not None:
4068             yield defined.pps(
4069                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4070             )
4071         for pp in self.pps_lenindef(decode_path):
4072             yield pp
4073
4074
4075 def agg_octet_string(evgens, decode_path, raw, writer):
4076     """Aggregate constructed string (OctetString and its derivatives)
4077
4078     :param evgens: iterator of generated events
4079     :param decode_path: points to the string we want to decode
4080     :param raw: slicebable (memoryview, bytearray, etc) with
4081                 the data evgens are generated on
4082     :param writer: buffer.write where string is going to be saved
4083     :param writer: where string is going to be saved. Must comply
4084                    with ``io.RawIOBase.write`` behaviour
4085
4086     .. seealso:: :ref:`agg_octet_string`
4087     """
4088     decode_path_len = len(decode_path)
4089     for dp, obj, _ in evgens:
4090         if dp[:decode_path_len] != decode_path:
4091             continue
4092         if not obj.ber_encoded:
4093             write_full(writer, raw[
4094                 obj.offset + obj.tlen + obj.llen:
4095                 obj.offset + obj.tlen + obj.llen + obj.vlen -
4096                 (EOC_LEN if obj.expl_lenindef else 0)
4097             ])
4098         if len(dp) == decode_path_len:
4099             break
4100
4101
4102 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4103
4104
4105 class Null(Obj):
4106     """``NULL`` null object
4107
4108     >>> n = Null()
4109     NULL
4110     >>> n.ready
4111     True
4112     """
4113     __slots__ = ()
4114     tag_default = tag_encode(5)
4115     asn1_type_name = "NULL"
4116
4117     def __init__(
4118             self,
4119             value=None,  # unused, but Sequence passes it
4120             impl=None,
4121             expl=None,
4122             optional=False,
4123             _decoded=(0, 0, 0),
4124     ):
4125         """
4126         :param bytes impl: override default tag with ``IMPLICIT`` one
4127         :param bytes expl: override default tag with ``EXPLICIT`` one
4128         :param bool optional: is object ``OPTIONAL`` in sequence
4129         """
4130         super(Null, self).__init__(impl, expl, None, optional, _decoded)
4131         self.default = None
4132
4133     @property
4134     def ready(self):
4135         return True
4136
4137     def __getstate__(self):
4138         return NullState(
4139             __version__,
4140             self.tag,
4141             self._tag_order,
4142             self._expl,
4143             self.default,
4144             self.optional,
4145             self.offset,
4146             self.llen,
4147             self.vlen,
4148             self.expl_lenindef,
4149             self.lenindef,
4150             self.ber_encoded,
4151         )
4152
4153     def __eq__(self, their):
4154         if not issubclass(their.__class__, Null):
4155             return False
4156         return (
4157             self.tag == their.tag and
4158             self._expl == their._expl
4159         )
4160
4161     def __call__(
4162             self,
4163             value=None,
4164             impl=None,
4165             expl=None,
4166             optional=None,
4167     ):
4168         return self.__class__(
4169             impl=self.tag if impl is None else impl,
4170             expl=self._expl if expl is None else expl,
4171             optional=self.optional if optional is None else optional,
4172         )
4173
4174     def _encode(self):
4175         return self.tag + LEN0
4176
4177     def _encode1st(self, state):
4178         return len(self.tag) + 1, state
4179
4180     def _encode2nd(self, writer, state_iter):
4181         write_full(writer, self.tag + LEN0)
4182
4183     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4184         try:
4185             t, _, lv = tag_strip(tlv)
4186         except DecodeError as err:
4187             raise err.__class__(
4188                 msg=err.msg,
4189                 klass=self.__class__,
4190                 decode_path=decode_path,
4191                 offset=offset,
4192             )
4193         if t != self.tag:
4194             raise TagMismatch(
4195                 klass=self.__class__,
4196                 decode_path=decode_path,
4197                 offset=offset,
4198             )
4199         if tag_only:  # pragma: no cover
4200             yield None
4201             return
4202         try:
4203             l, _, v = len_decode(lv)
4204         except DecodeError as err:
4205             raise err.__class__(
4206                 msg=err.msg,
4207                 klass=self.__class__,
4208                 decode_path=decode_path,
4209                 offset=offset,
4210             )
4211         if l != 0:
4212             raise InvalidLength(
4213                 "Null must have zero length",
4214                 klass=self.__class__,
4215                 decode_path=decode_path,
4216                 offset=offset,
4217             )
4218         obj = self.__class__(
4219             impl=self.tag,
4220             expl=self._expl,
4221             optional=self.optional,
4222             _decoded=(offset, 1, 0),
4223         )
4224         yield decode_path, obj, v
4225
4226     def __repr__(self):
4227         return pp_console_row(next(self.pps()))
4228
4229     def pps(self, decode_path=()):
4230         yield _pp(
4231             obj=self,
4232             asn1_type_name=self.asn1_type_name,
4233             obj_name=self.__class__.__name__,
4234             decode_path=decode_path,
4235             optional=self.optional,
4236             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4237             expl=None if self._expl is None else tag_decode(self._expl),
4238             offset=self.offset,
4239             tlen=self.tlen,
4240             llen=self.llen,
4241             vlen=self.vlen,
4242             expl_offset=self.expl_offset if self.expled else None,
4243             expl_tlen=self.expl_tlen if self.expled else None,
4244             expl_llen=self.expl_llen if self.expled else None,
4245             expl_vlen=self.expl_vlen if self.expled else None,
4246             expl_lenindef=self.expl_lenindef,
4247             bered=self.bered,
4248         )
4249         for pp in self.pps_lenindef(decode_path):
4250             yield pp
4251
4252
4253 ObjectIdentifierState = namedtuple(
4254     "ObjectIdentifierState",
4255     BasicState._fields + ("value", "defines"),
4256     **NAMEDTUPLE_KWARGS
4257 )
4258
4259
4260 class ObjectIdentifier(Obj):
4261     """``OBJECT IDENTIFIER`` OID type
4262
4263     >>> oid = ObjectIdentifier((1, 2, 3))
4264     OBJECT IDENTIFIER 1.2.3
4265     >>> oid == ObjectIdentifier("1.2.3")
4266     True
4267     >>> tuple(oid)
4268     (1, 2, 3)
4269     >>> str(oid)
4270     '1.2.3'
4271     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4272     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4273
4274     >>> str(ObjectIdentifier((3, 1)))
4275     Traceback (most recent call last):
4276     pyderasn.InvalidOID: unacceptable first arc value
4277     """
4278     __slots__ = ("defines",)
4279     tag_default = tag_encode(6)
4280     asn1_type_name = "OBJECT IDENTIFIER"
4281
4282     def __init__(
4283             self,
4284             value=None,
4285             defines=(),
4286             impl=None,
4287             expl=None,
4288             default=None,
4289             optional=False,
4290             _decoded=(0, 0, 0),
4291     ):
4292         """
4293         :param value: set the value. Either tuples of integers,
4294                       string of "."-concatenated integers, or
4295                       :py:class:`pyderasn.ObjectIdentifier` object
4296         :param defines: sequence of tuples. Each tuple has two elements.
4297                         First one is relative to current one decode
4298                         path, aiming to the field defined by that OID.
4299                         Read about relative path in
4300                         :py:func:`pyderasn.abs_decode_path`. Second
4301                         tuple element is ``{OID: pyderasn.Obj()}``
4302                         dictionary, mapping between current OID value
4303                         and structure applied to defined field.
4304
4305                         .. seealso:: :ref:`definedby`
4306
4307         :param bytes impl: override default tag with ``IMPLICIT`` one
4308         :param bytes expl: override default tag with ``EXPLICIT`` one
4309         :param default: set default value. Type same as in ``value``
4310         :param bool optional: is object ``OPTIONAL`` in sequence
4311         """
4312         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4313         self._value = value
4314         if value is not None:
4315             self._value = self._value_sanitize(value)
4316         if default is not None:
4317             default = self._value_sanitize(default)
4318             self.default = self.__class__(
4319                 value=default,
4320                 impl=self.tag,
4321                 expl=self._expl,
4322             )
4323             if self._value is None:
4324                 self._value = default
4325         self.defines = defines
4326
4327     def __add__(self, their):
4328         if their.__class__ == tuple:
4329             return self.__class__(self._value + array("L", their))
4330         if isinstance(their, self.__class__):
4331             return self.__class__(self._value + their._value)
4332         raise InvalidValueType((self.__class__, tuple))
4333
4334     def _value_sanitize(self, value):
4335         if issubclass(value.__class__, ObjectIdentifier):
4336             return value._value
4337         if isinstance(value, string_types):
4338             try:
4339                 value = array("L", (pureint(arc) for arc in value.split(".")))
4340             except ValueError:
4341                 raise InvalidOID("unacceptable arcs values")
4342         if value.__class__ == tuple:
4343             try:
4344                 value = array("L", value)
4345             except OverflowError as err:
4346                 raise InvalidOID(repr(err))
4347         if value.__class__ is array:
4348             if len(value) < 2:
4349                 raise InvalidOID("less than 2 arcs")
4350             first_arc = value[0]
4351             if first_arc in (0, 1):
4352                 if not (0 <= value[1] <= 39):
4353                     raise InvalidOID("second arc is too wide")
4354             elif first_arc == 2:
4355                 pass
4356             else:
4357                 raise InvalidOID("unacceptable first arc value")
4358             if not all(arc >= 0 for arc in value):
4359                 raise InvalidOID("negative arc value")
4360             return value
4361         raise InvalidValueType((self.__class__, str, tuple))
4362
4363     @property
4364     def ready(self):
4365         return self._value is not None
4366
4367     def __getstate__(self):
4368         return ObjectIdentifierState(
4369             __version__,
4370             self.tag,
4371             self._tag_order,
4372             self._expl,
4373             self.default,
4374             self.optional,
4375             self.offset,
4376             self.llen,
4377             self.vlen,
4378             self.expl_lenindef,
4379             self.lenindef,
4380             self.ber_encoded,
4381             self._value,
4382             self.defines,
4383         )
4384
4385     def __setstate__(self, state):
4386         super(ObjectIdentifier, self).__setstate__(state)
4387         self._value = state.value
4388         self.defines = state.defines
4389
4390     def __iter__(self):
4391         self._assert_ready()
4392         return iter(self._value)
4393
4394     def __str__(self):
4395         return ".".join(str(arc) for arc in self._value or ())
4396
4397     def __hash__(self):
4398         self._assert_ready()
4399         return hash(b"".join((
4400             self.tag,
4401             bytes(self._expl or b""),
4402             str(self._value).encode("ascii"),
4403         )))
4404
4405     def __eq__(self, their):
4406         if their.__class__ == tuple:
4407             return self._value == array("L", their)
4408         if not issubclass(their.__class__, ObjectIdentifier):
4409             return False
4410         return (
4411             self.tag == their.tag and
4412             self._expl == their._expl and
4413             self._value == their._value
4414         )
4415
4416     def __lt__(self, their):
4417         return self._value < their._value
4418
4419     def __call__(
4420             self,
4421             value=None,
4422             defines=None,
4423             impl=None,
4424             expl=None,
4425             default=None,
4426             optional=None,
4427     ):
4428         return self.__class__(
4429             value=value,
4430             defines=self.defines if defines is None else defines,
4431             impl=self.tag if impl is None else impl,
4432             expl=self._expl if expl is None else expl,
4433             default=self.default if default is None else default,
4434             optional=self.optional if optional is None else optional,
4435         )
4436
4437     def _encode_octets(self):
4438         self._assert_ready()
4439         value = self._value
4440         first_value = value[1]
4441         first_arc = value[0]
4442         if first_arc == 0:
4443             pass
4444         elif first_arc == 1:
4445             first_value += 40
4446         elif first_arc == 2:
4447             first_value += 80
4448         else:  # pragma: no cover
4449             raise RuntimeError("invalid arc is stored")
4450         octets = [zero_ended_encode(first_value)]
4451         for arc in value[2:]:
4452             octets.append(zero_ended_encode(arc))
4453         return b"".join(octets)
4454
4455     def _encode(self):
4456         v = self._encode_octets()
4457         return b"".join((self.tag, len_encode(len(v)), v))
4458
4459     def _encode1st(self, state):
4460         l = len(self._encode_octets())
4461         return len(self.tag) + len_size(l) + l, state
4462
4463     def _encode2nd(self, writer, state_iter):
4464         write_full(writer, self._encode())
4465
4466     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4467         try:
4468             t, _, lv = tag_strip(tlv)
4469         except DecodeError as err:
4470             raise err.__class__(
4471                 msg=err.msg,
4472                 klass=self.__class__,
4473                 decode_path=decode_path,
4474                 offset=offset,
4475             )
4476         if t != self.tag:
4477             raise TagMismatch(
4478                 klass=self.__class__,
4479                 decode_path=decode_path,
4480                 offset=offset,
4481             )
4482         if tag_only:  # pragma: no cover
4483             yield None
4484             return
4485         try:
4486             l, llen, v = len_decode(lv)
4487         except DecodeError as err:
4488             raise err.__class__(
4489                 msg=err.msg,
4490                 klass=self.__class__,
4491                 decode_path=decode_path,
4492                 offset=offset,
4493             )
4494         if l > len(v):
4495             raise NotEnoughData(
4496                 "encoded length is longer than data",
4497                 klass=self.__class__,
4498                 decode_path=decode_path,
4499                 offset=offset,
4500             )
4501         if l == 0:
4502             raise NotEnoughData(
4503                 "zero length",
4504                 klass=self.__class__,
4505                 decode_path=decode_path,
4506                 offset=offset,
4507             )
4508         v, tail = v[:l], v[l:]
4509         arcs = array("L")
4510         ber_encoded = False
4511         while len(v) > 0:
4512             i = 0
4513             arc = 0
4514             while True:
4515                 octet = indexbytes(v, i)
4516                 if i == 0 and octet == 0x80:
4517                     if ctx.get("bered", False):
4518                         ber_encoded = True
4519                     else:
4520                         raise DecodeError(
4521                             "non normalized arc encoding",
4522                             klass=self.__class__,
4523                             decode_path=decode_path,
4524                             offset=offset,
4525                         )
4526                 arc = (arc << 7) | (octet & 0x7F)
4527                 if octet & 0x80 == 0:
4528                     try:
4529                         arcs.append(arc)
4530                     except OverflowError:
4531                         raise DecodeError(
4532                             "too huge value for local unsigned long",
4533                             klass=self.__class__,
4534                             decode_path=decode_path,
4535                             offset=offset,
4536                         )
4537                     v = v[i + 1:]
4538                     break
4539                 i += 1
4540                 if i == len(v):
4541                     raise DecodeError(
4542                         "unfinished OID",
4543                         klass=self.__class__,
4544                         decode_path=decode_path,
4545                         offset=offset,
4546                     )
4547         first_arc = 0
4548         second_arc = arcs[0]
4549         if 0 <= second_arc <= 39:
4550             first_arc = 0
4551         elif 40 <= second_arc <= 79:
4552             first_arc = 1
4553             second_arc -= 40
4554         else:
4555             first_arc = 2
4556             second_arc -= 80
4557         obj = self.__class__(
4558             value=array("L", (first_arc, second_arc)) + arcs[1:],
4559             impl=self.tag,
4560             expl=self._expl,
4561             default=self.default,
4562             optional=self.optional,
4563             _decoded=(offset, llen, l),
4564         )
4565         if ber_encoded:
4566             obj.ber_encoded = True
4567         yield decode_path, obj, tail
4568
4569     def __repr__(self):
4570         return pp_console_row(next(self.pps()))
4571
4572     def pps(self, decode_path=()):
4573         yield _pp(
4574             obj=self,
4575             asn1_type_name=self.asn1_type_name,
4576             obj_name=self.__class__.__name__,
4577             decode_path=decode_path,
4578             value=str(self) if self.ready else None,
4579             optional=self.optional,
4580             default=self == self.default,
4581             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4582             expl=None if self._expl is None else tag_decode(self._expl),
4583             offset=self.offset,
4584             tlen=self.tlen,
4585             llen=self.llen,
4586             vlen=self.vlen,
4587             expl_offset=self.expl_offset if self.expled else None,
4588             expl_tlen=self.expl_tlen if self.expled else None,
4589             expl_llen=self.expl_llen if self.expled else None,
4590             expl_vlen=self.expl_vlen if self.expled else None,
4591             expl_lenindef=self.expl_lenindef,
4592             ber_encoded=self.ber_encoded,
4593             bered=self.bered,
4594         )
4595         for pp in self.pps_lenindef(decode_path):
4596             yield pp
4597
4598
4599 class Enumerated(Integer):
4600     """``ENUMERATED`` integer type
4601
4602     This type is identical to :py:class:`pyderasn.Integer`, but requires
4603     schema to be specified and does not accept values missing from it.
4604     """
4605     __slots__ = ()
4606     tag_default = tag_encode(10)
4607     asn1_type_name = "ENUMERATED"
4608
4609     def __init__(
4610             self,
4611             value=None,
4612             impl=None,
4613             expl=None,
4614             default=None,
4615             optional=False,
4616             _specs=None,
4617             _decoded=(0, 0, 0),
4618             bounds=None,  # dummy argument, workability for Integer.decode
4619     ):
4620         super(Enumerated, self).__init__(
4621             value, bounds, impl, expl, default, optional, _specs, _decoded,
4622         )
4623         if len(self.specs) == 0:
4624             raise ValueError("schema must be specified")
4625
4626     def _value_sanitize(self, value):
4627         if isinstance(value, self.__class__):
4628             value = value._value
4629         elif isinstance(value, integer_types):
4630             for _value in itervalues(self.specs):
4631                 if _value == value:
4632                     break
4633             else:
4634                 raise DecodeError(
4635                     "unknown integer value: %s" % value,
4636                     klass=self.__class__,
4637                 )
4638         elif isinstance(value, string_types):
4639             value = self.specs.get(value)
4640             if value is None:
4641                 raise ObjUnknown("integer value: %s" % value)
4642         else:
4643             raise InvalidValueType((self.__class__, int, str))
4644         return value
4645
4646     def __call__(
4647             self,
4648             value=None,
4649             impl=None,
4650             expl=None,
4651             default=None,
4652             optional=None,
4653             _specs=None,
4654     ):
4655         return self.__class__(
4656             value=value,
4657             impl=self.tag if impl is None else impl,
4658             expl=self._expl if expl is None else expl,
4659             default=self.default if default is None else default,
4660             optional=self.optional if optional is None else optional,
4661             _specs=self.specs,
4662         )
4663
4664
4665 def escape_control_unicode(c):
4666     if unicat(c)[0] == "C":
4667         c = repr(c).lstrip("u").strip("'")
4668     return c
4669
4670
4671 class CommonString(OctetString):
4672     """Common class for all strings
4673
4674     Everything resembles :py:class:`pyderasn.OctetString`, except
4675     ability to deal with unicode text strings.
4676
4677     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4678     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4679     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4680     True
4681     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4682     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4683     >>> str(s)
4684     'привет Ð¼Ð¸Ñ€'
4685     >>> hexenc(bytes(s))
4686     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4687
4688     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4689     Traceback (most recent call last):
4690     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4691
4692     >>> BMPString("ада", bounds=(2, 2))
4693     Traceback (most recent call last):
4694     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4695     >>> s = BMPString("ад", bounds=(2, 2))
4696     >>> s.encoding
4697     'utf-16-be'
4698     >>> hexenc(bytes(s))
4699     '04300434'
4700
4701     .. list-table::
4702        :header-rows: 1
4703
4704        * - Class
4705          - Text Encoding
4706        * - :py:class:`pyderasn.UTF8String`
4707          - utf-8
4708        * - :py:class:`pyderasn.NumericString`
4709          - ascii
4710        * - :py:class:`pyderasn.PrintableString`
4711          - ascii
4712        * - :py:class:`pyderasn.TeletexString`
4713          - ascii
4714        * - :py:class:`pyderasn.T61String`
4715          - ascii
4716        * - :py:class:`pyderasn.VideotexString`
4717          - iso-8859-1
4718        * - :py:class:`pyderasn.IA5String`
4719          - ascii
4720        * - :py:class:`pyderasn.GraphicString`
4721          - iso-8859-1
4722        * - :py:class:`pyderasn.VisibleString`
4723          - ascii
4724        * - :py:class:`pyderasn.ISO646String`
4725          - ascii
4726        * - :py:class:`pyderasn.GeneralString`
4727          - iso-8859-1
4728        * - :py:class:`pyderasn.UniversalString`
4729          - utf-32-be
4730        * - :py:class:`pyderasn.BMPString`
4731          - utf-16-be
4732     """
4733     __slots__ = ()
4734
4735     def _value_sanitize(self, value):
4736         value_raw = None
4737         value_decoded = None
4738         if isinstance(value, self.__class__):
4739             value_raw = value._value
4740         elif value.__class__ == text_type:
4741             value_decoded = value
4742         elif value.__class__ == binary_type:
4743             value_raw = value
4744         else:
4745             raise InvalidValueType((self.__class__, text_type, binary_type))
4746         try:
4747             value_raw = (
4748                 value_decoded.encode(self.encoding)
4749                 if value_raw is None else value_raw
4750             )
4751             value_decoded = (
4752                 value_raw.decode(self.encoding)
4753                 if value_decoded is None else value_decoded
4754             )
4755         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4756             raise DecodeError(str(err))
4757         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4758             raise BoundsError(
4759                 self._bound_min,
4760                 len(value_decoded),
4761                 self._bound_max,
4762             )
4763         return value_raw
4764
4765     def __eq__(self, their):
4766         if their.__class__ == binary_type:
4767             return self._value == their
4768         if their.__class__ == text_type:
4769             return self._value == their.encode(self.encoding)
4770         if not isinstance(their, self.__class__):
4771             return False
4772         return (
4773             self._value == their._value and
4774             self.tag == their.tag and
4775             self._expl == their._expl
4776         )
4777
4778     def __unicode__(self):
4779         if self.ready:
4780             return self._value.decode(self.encoding)
4781         return text_type(self._value)
4782
4783     def __repr__(self):
4784         return pp_console_row(next(self.pps(no_unicode=PY2)))
4785
4786     def pps(self, decode_path=(), no_unicode=False):
4787         value = None
4788         if self.ready:
4789             value = (
4790                 hexenc(bytes(self)) if no_unicode else
4791                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4792             )
4793         yield _pp(
4794             obj=self,
4795             asn1_type_name=self.asn1_type_name,
4796             obj_name=self.__class__.__name__,
4797             decode_path=decode_path,
4798             value=value,
4799             optional=self.optional,
4800             default=self == self.default,
4801             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4802             expl=None if self._expl is None else tag_decode(self._expl),
4803             offset=self.offset,
4804             tlen=self.tlen,
4805             llen=self.llen,
4806             vlen=self.vlen,
4807             expl_offset=self.expl_offset if self.expled else None,
4808             expl_tlen=self.expl_tlen if self.expled else None,
4809             expl_llen=self.expl_llen if self.expled else None,
4810             expl_vlen=self.expl_vlen if self.expled else None,
4811             expl_lenindef=self.expl_lenindef,
4812             ber_encoded=self.ber_encoded,
4813             bered=self.bered,
4814         )
4815         for pp in self.pps_lenindef(decode_path):
4816             yield pp
4817
4818
4819 class UTF8String(CommonString):
4820     __slots__ = ()
4821     tag_default = tag_encode(12)
4822     encoding = "utf-8"
4823     asn1_type_name = "UTF8String"
4824
4825
4826 class AllowableCharsMixin(object):
4827     @property
4828     def allowable_chars(self):
4829         if PY2:
4830             return self._allowable_chars
4831         return frozenset(six_unichr(c) for c in self._allowable_chars)
4832
4833
4834 class NumericString(AllowableCharsMixin, CommonString):
4835     """Numeric string
4836
4837     Its value is properly sanitized: only ASCII digits with spaces can
4838     be stored.
4839
4840     >>> NumericString().allowable_chars
4841     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4842     """
4843     __slots__ = ()
4844     tag_default = tag_encode(18)
4845     encoding = "ascii"
4846     asn1_type_name = "NumericString"
4847     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4848
4849     def _value_sanitize(self, value):
4850         value = super(NumericString, self)._value_sanitize(value)
4851         if not frozenset(value) <= self._allowable_chars:
4852             raise DecodeError("non-numeric value")
4853         return value
4854
4855
4856 PrintableStringState = namedtuple(
4857     "PrintableStringState",
4858     OctetStringState._fields + ("allowable_chars",),
4859     **NAMEDTUPLE_KWARGS
4860 )
4861
4862
4863 class PrintableString(AllowableCharsMixin, CommonString):
4864     """Printable string
4865
4866     Its value is properly sanitized: see X.680 41.4 table 10.
4867
4868     >>> PrintableString().allowable_chars
4869     frozenset([' ', "'", ..., 'z'])
4870     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4871     PrintableString PrintableString foo*bar
4872     >>> obj.allow_asterisk, obj.allow_ampersand
4873     (True, False)
4874     """
4875     __slots__ = ()
4876     tag_default = tag_encode(19)
4877     encoding = "ascii"
4878     asn1_type_name = "PrintableString"
4879     _allowable_chars = frozenset(
4880         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4881     )
4882     _asterisk = frozenset("*".encode("ascii"))
4883     _ampersand = frozenset("&".encode("ascii"))
4884
4885     def __init__(
4886             self,
4887             value=None,
4888             bounds=None,
4889             impl=None,
4890             expl=None,
4891             default=None,
4892             optional=False,
4893             _decoded=(0, 0, 0),
4894             ctx=None,
4895             allow_asterisk=False,
4896             allow_ampersand=False,
4897     ):
4898         """
4899         :param allow_asterisk: allow asterisk character
4900         :param allow_ampersand: allow ampersand character
4901         """
4902         if allow_asterisk:
4903             self._allowable_chars |= self._asterisk
4904         if allow_ampersand:
4905             self._allowable_chars |= self._ampersand
4906         super(PrintableString, self).__init__(
4907             value, bounds, impl, expl, default, optional, _decoded, ctx,
4908         )
4909
4910     @property
4911     def allow_asterisk(self):
4912         """Is asterisk character allowed?
4913         """
4914         return self._asterisk <= self._allowable_chars
4915
4916     @property
4917     def allow_ampersand(self):
4918         """Is ampersand character allowed?
4919         """
4920         return self._ampersand <= self._allowable_chars
4921
4922     def _value_sanitize(self, value):
4923         value = super(PrintableString, self)._value_sanitize(value)
4924         if not frozenset(value) <= self._allowable_chars:
4925             raise DecodeError("non-printable value")
4926         return value
4927
4928     def __getstate__(self):
4929         return PrintableStringState(
4930             *super(PrintableString, self).__getstate__(),
4931             **{"allowable_chars": self._allowable_chars}
4932         )
4933
4934     def __setstate__(self, state):
4935         super(PrintableString, self).__setstate__(state)
4936         self._allowable_chars = state.allowable_chars
4937
4938     def __call__(
4939             self,
4940             value=None,
4941             bounds=None,
4942             impl=None,
4943             expl=None,
4944             default=None,
4945             optional=None,
4946     ):
4947         return self.__class__(
4948             value=value,
4949             bounds=(
4950                 (self._bound_min, self._bound_max)
4951                 if bounds is None else bounds
4952             ),
4953             impl=self.tag if impl is None else impl,
4954             expl=self._expl if expl is None else expl,
4955             default=self.default if default is None else default,
4956             optional=self.optional if optional is None else optional,
4957             allow_asterisk=self.allow_asterisk,
4958             allow_ampersand=self.allow_ampersand,
4959         )
4960
4961
4962 class TeletexString(CommonString):
4963     __slots__ = ()
4964     tag_default = tag_encode(20)
4965     encoding = "ascii"
4966     asn1_type_name = "TeletexString"
4967
4968
4969 class T61String(TeletexString):
4970     __slots__ = ()
4971     asn1_type_name = "T61String"
4972
4973
4974 class VideotexString(CommonString):
4975     __slots__ = ()
4976     tag_default = tag_encode(21)
4977     encoding = "iso-8859-1"
4978     asn1_type_name = "VideotexString"
4979
4980
4981 class IA5String(CommonString):
4982     __slots__ = ()
4983     tag_default = tag_encode(22)
4984     encoding = "ascii"
4985     asn1_type_name = "IA5"
4986
4987
4988 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4989 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4990 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4991 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4992 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4993 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4994
4995
4996 class VisibleString(CommonString):
4997     __slots__ = ()
4998     tag_default = tag_encode(26)
4999     encoding = "ascii"
5000     asn1_type_name = "VisibleString"
5001
5002
5003 UTCTimeState = namedtuple(
5004     "UTCTimeState",
5005     OctetStringState._fields + ("ber_raw",),
5006     **NAMEDTUPLE_KWARGS
5007 )
5008
5009
5010 def str_to_time_fractions(value):
5011     v = pureint(value)
5012     year, v = (v // 10**10), (v % 10**10)
5013     month, v = (v // 10**8), (v % 10**8)
5014     day, v = (v // 10**6), (v % 10**6)
5015     hour, v = (v // 10**4), (v % 10**4)
5016     minute, second = (v // 100), (v % 100)
5017     return year, month, day, hour, minute, second
5018
5019
5020 class UTCTime(VisibleString):
5021     """``UTCTime`` datetime type
5022
5023     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5024     UTCTime UTCTime 2017-09-30T22:07:50
5025     >>> str(t)
5026     '170930220750Z'
5027     >>> bytes(t)
5028     b'170930220750Z'
5029     >>> t.todatetime()
5030     datetime.datetime(2017, 9, 30, 22, 7, 50)
5031     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5032     datetime.datetime(1957, 9, 30, 22, 7, 50)
5033
5034     If BER encoded value was met, then ``ber_raw`` attribute will hold
5035     its raw representation.
5036
5037     .. warning::
5038
5039        Pay attention that UTCTime can not hold full year, so all years
5040        having < 50 years are treated as 20xx, 19xx otherwise, according
5041        to X.509 recommendation.
5042
5043     .. warning::
5044
5045        No strict validation of UTC offsets are made, but very crude:
5046
5047        * minutes are not exceeding 60
5048        * offset value is not exceeding 14 hours
5049     """
5050     __slots__ = ("ber_raw",)
5051     tag_default = tag_encode(23)
5052     encoding = "ascii"
5053     asn1_type_name = "UTCTime"
5054     evgen_mode_skip_value = False
5055
5056     def __init__(
5057             self,
5058             value=None,
5059             impl=None,
5060             expl=None,
5061             default=None,
5062             optional=False,
5063             _decoded=(0, 0, 0),
5064             bounds=None,  # dummy argument, workability for OctetString.decode
5065             ctx=None,
5066     ):
5067         """
5068         :param value: set the value. Either datetime type, or
5069                       :py:class:`pyderasn.UTCTime` object
5070         :param bytes impl: override default tag with ``IMPLICIT`` one
5071         :param bytes expl: override default tag with ``EXPLICIT`` one
5072         :param default: set default value. Type same as in ``value``
5073         :param bool optional: is object ``OPTIONAL`` in sequence
5074         """
5075         super(UTCTime, self).__init__(
5076             None, None, impl, expl, None, optional, _decoded, ctx,
5077         )
5078         self._value = value
5079         self.ber_raw = None
5080         if value is not None:
5081             self._value, self.ber_raw = self._value_sanitize(value, ctx)
5082             self.ber_encoded = self.ber_raw is not None
5083         if default is not None:
5084             default, _ = self._value_sanitize(default)
5085             self.default = self.__class__(
5086                 value=default,
5087                 impl=self.tag,
5088                 expl=self._expl,
5089             )
5090             if self._value is None:
5091                 self._value = default
5092             optional = True
5093         self.optional = optional
5094
5095     def _strptime_bered(self, value):
5096         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5097         value = value[10:]
5098         if len(value) == 0:
5099             raise ValueError("no timezone")
5100         year += 2000 if year < 50 else 1900
5101         decoded = datetime(year, month, day, hour, minute)
5102         offset = 0
5103         if value[-1] == "Z":
5104             value = value[:-1]
5105         else:
5106             if len(value) < 5:
5107                 raise ValueError("invalid UTC offset")
5108             if value[-5] == "-":
5109                 sign = -1
5110             elif value[-5] == "+":
5111                 sign = 1
5112             else:
5113                 raise ValueError("invalid UTC offset")
5114             v = pureint(value[-4:])
5115             offset, v = (60 * (v % 100)), v // 100
5116             if offset >= 3600:
5117                 raise ValueError("invalid UTC offset minutes")
5118             offset += 3600 * v
5119             if offset > 14 * 3600:
5120                 raise ValueError("too big UTC offset")
5121             offset *= sign
5122             value = value[:-5]
5123         if len(value) == 0:
5124             return offset, decoded
5125         if len(value) != 2:
5126             raise ValueError("invalid UTC offset seconds")
5127         seconds = pureint(value)
5128         if seconds >= 60:
5129             raise ValueError("invalid seconds value")
5130         return offset, decoded + timedelta(seconds=seconds)
5131
5132     def _strptime(self, value):
5133         # datetime.strptime's format: %y%m%d%H%M%SZ
5134         if len(value) != LEN_YYMMDDHHMMSSZ:
5135             raise ValueError("invalid UTCTime length")
5136         if value[-1] != "Z":
5137             raise ValueError("non UTC timezone")
5138         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5139         year += 2000 if year < 50 else 1900
5140         return datetime(year, month, day, hour, minute, second)
5141
5142     def _dt_sanitize(self, value):
5143         if value.year < 1950 or value.year > 2049:
5144             raise ValueError("UTCTime can hold only 1950-2049 years")
5145         return value.replace(microsecond=0)
5146
5147     def _value_sanitize(self, value, ctx=None):
5148         if value.__class__ == binary_type:
5149             try:
5150                 value_decoded = value.decode("ascii")
5151             except (UnicodeEncodeError, UnicodeDecodeError) as err:
5152                 raise DecodeError("invalid UTCTime encoding: %r" % err)
5153             err = None
5154             try:
5155                 return self._strptime(value_decoded), None
5156             except (TypeError, ValueError) as _err:
5157                 err = _err
5158                 if (ctx is not None) and ctx.get("bered", False):
5159                     try:
5160                         offset, _value = self._strptime_bered(value_decoded)
5161                         _value = _value - timedelta(seconds=offset)
5162                         return self._dt_sanitize(_value), value
5163                     except (TypeError, ValueError, OverflowError) as _err:
5164                         err = _err
5165             raise DecodeError(
5166                 "invalid %s format: %r" % (self.asn1_type_name, err),
5167                 klass=self.__class__,
5168             )
5169         if isinstance(value, self.__class__):
5170             return value._value, None
5171         if value.__class__ == datetime:
5172             return self._dt_sanitize(value), None
5173         raise InvalidValueType((self.__class__, datetime))
5174
5175     def _pp_value(self):
5176         if self.ready:
5177             value = self._value.isoformat()
5178             if self.ber_encoded:
5179                 value += " (%s)" % self.ber_raw
5180             return value
5181         return None
5182
5183     def __unicode__(self):
5184         if self.ready:
5185             value = self._value.isoformat()
5186             if self.ber_encoded:
5187                 value += " (%s)" % self.ber_raw
5188             return value
5189         return text_type(self._pp_value())
5190
5191     def __getstate__(self):
5192         return UTCTimeState(
5193             *super(UTCTime, self).__getstate__(),
5194             **{"ber_raw": self.ber_raw}
5195         )
5196
5197     def __setstate__(self, state):
5198         super(UTCTime, self).__setstate__(state)
5199         self.ber_raw = state.ber_raw
5200
5201     def __bytes__(self):
5202         self._assert_ready()
5203         return self._encode_time()
5204
5205     def __eq__(self, their):
5206         if their.__class__ == binary_type:
5207             return self._encode_time() == their
5208         if their.__class__ == datetime:
5209             return self.todatetime() == their
5210         if not isinstance(their, self.__class__):
5211             return False
5212         return (
5213             self._value == their._value and
5214             self.tag == their.tag and
5215             self._expl == their._expl
5216         )
5217
5218     def _encode_time(self):
5219         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5220
5221     def _encode(self):
5222         self._assert_ready()
5223         return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5224
5225     def _encode1st(self, state):
5226         return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5227
5228     def _encode2nd(self, writer, state_iter):
5229         self._assert_ready()
5230         write_full(writer, self._encode())
5231
5232     def _encode_cer(self, writer):
5233         write_full(writer, self._encode())
5234
5235     def todatetime(self):
5236         return self._value
5237
5238     def __repr__(self):
5239         return pp_console_row(next(self.pps()))
5240
5241     def pps(self, decode_path=()):
5242         yield _pp(
5243             obj=self,
5244             asn1_type_name=self.asn1_type_name,
5245             obj_name=self.__class__.__name__,
5246             decode_path=decode_path,
5247             value=self._pp_value(),
5248             optional=self.optional,
5249             default=self == self.default,
5250             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5251             expl=None if self._expl is None else tag_decode(self._expl),
5252             offset=self.offset,
5253             tlen=self.tlen,
5254             llen=self.llen,
5255             vlen=self.vlen,
5256             expl_offset=self.expl_offset if self.expled else None,
5257             expl_tlen=self.expl_tlen if self.expled else None,
5258             expl_llen=self.expl_llen if self.expled else None,
5259             expl_vlen=self.expl_vlen if self.expled else None,
5260             expl_lenindef=self.expl_lenindef,
5261             ber_encoded=self.ber_encoded,
5262             bered=self.bered,
5263         )
5264         for pp in self.pps_lenindef(decode_path):
5265             yield pp
5266
5267
5268 class GeneralizedTime(UTCTime):
5269     """``GeneralizedTime`` datetime type
5270
5271     This type is similar to :py:class:`pyderasn.UTCTime`.
5272
5273     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5274     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5275     >>> str(t)
5276     '20170930220750.000123Z'
5277     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5278     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5279
5280     .. warning::
5281
5282        Only microsecond fractions are supported in DER encoding.
5283        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5284        higher precision values.
5285
5286     .. warning::
5287
5288        BER encoded data can loss information (accuracy) during decoding
5289        because of float transformations.
5290
5291     .. warning::
5292
5293        Local times (without explicit timezone specification) are treated
5294        as UTC one, no transformations are made.
5295
5296     .. warning::
5297
5298        Zero year is unsupported.
5299     """
5300     __slots__ = ()
5301     tag_default = tag_encode(24)
5302     asn1_type_name = "GeneralizedTime"
5303
5304     def _dt_sanitize(self, value):
5305         return value
5306
5307     def _strptime_bered(self, value):
5308         if len(value) < 4 + 3 * 2:
5309             raise ValueError("invalid GeneralizedTime")
5310         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5311         decoded = datetime(year, month, day, hour)
5312         offset, value = 0, value[10:]
5313         if len(value) == 0:
5314             return offset, decoded
5315         if value[-1] == "Z":
5316             value = value[:-1]
5317         else:
5318             for char, sign in (("-", -1), ("+", 1)):
5319                 idx = value.rfind(char)
5320                 if idx == -1:
5321                     continue
5322                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5323                 v = pureint(offset_raw)
5324                 if len(offset_raw) == 4:
5325                     offset, v = (60 * (v % 100)), v // 100
5326                     if offset >= 3600:
5327                         raise ValueError("invalid UTC offset minutes")
5328                 elif len(offset_raw) == 2:
5329                     pass
5330                 else:
5331                     raise ValueError("invalid UTC offset")
5332                 offset += 3600 * v
5333                 if offset > 14 * 3600:
5334                     raise ValueError("too big UTC offset")
5335                 offset *= sign
5336                 break
5337         if len(value) == 0:
5338             return offset, decoded
5339         if value[0] in DECIMAL_SIGNS:
5340             return offset, (
5341                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5342             )
5343         if len(value) < 2:
5344             raise ValueError("stripped minutes")
5345         decoded += timedelta(seconds=60 * pureint(value[:2]))
5346         value = value[2:]
5347         if len(value) == 0:
5348             return offset, decoded
5349         if value[0] in DECIMAL_SIGNS:
5350             return offset, (
5351                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5352             )
5353         if len(value) < 2:
5354             raise ValueError("stripped seconds")
5355         decoded += timedelta(seconds=pureint(value[:2]))
5356         value = value[2:]
5357         if len(value) == 0:
5358             return offset, decoded
5359         if value[0] not in DECIMAL_SIGNS:
5360             raise ValueError("invalid format after seconds")
5361         return offset, (
5362             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5363         )
5364
5365     def _strptime(self, value):
5366         l = len(value)
5367         if l == LEN_YYYYMMDDHHMMSSZ:
5368             # datetime.strptime's format: %Y%m%d%H%M%SZ
5369             if value[-1] != "Z":
5370                 raise ValueError("non UTC timezone")
5371             return datetime(*str_to_time_fractions(value[:-1]))
5372         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5373             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5374             if value[-1] != "Z":
5375                 raise ValueError("non UTC timezone")
5376             if value[14] != ".":
5377                 raise ValueError("no fractions separator")
5378             us = value[15:-1]
5379             if us[-1] == "0":
5380                 raise ValueError("trailing zero")
5381             us_len = len(us)
5382             if us_len > 6:
5383                 raise ValueError("only microsecond fractions are supported")
5384             us = pureint(us + ("0" * (6 - us_len)))
5385             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5386             return datetime(year, month, day, hour, minute, second, us)
5387         raise ValueError("invalid GeneralizedTime length")
5388
5389     def _encode_time(self):
5390         value = self._value
5391         encoded = value.strftime("%Y%m%d%H%M%S")
5392         if value.microsecond > 0:
5393             encoded += (".%06d" % value.microsecond).rstrip("0")
5394         return (encoded + "Z").encode("ascii")
5395
5396     def _encode(self):
5397         self._assert_ready()
5398         value = self._value
5399         if value.microsecond > 0:
5400             encoded = self._encode_time()
5401             return b"".join((self.tag, len_encode(len(encoded)), encoded))
5402         return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5403
5404     def _encode1st(self, state):
5405         self._assert_ready()
5406         vlen = len(self._encode_time())
5407         return len(self.tag) + len_size(vlen) + vlen, state
5408
5409     def _encode2nd(self, writer, state_iter):
5410         write_full(writer, self._encode())
5411
5412
5413 class GraphicString(CommonString):
5414     __slots__ = ()
5415     tag_default = tag_encode(25)
5416     encoding = "iso-8859-1"
5417     asn1_type_name = "GraphicString"
5418
5419
5420 class ISO646String(VisibleString):
5421     __slots__ = ()
5422     asn1_type_name = "ISO646String"
5423
5424
5425 class GeneralString(CommonString):
5426     __slots__ = ()
5427     tag_default = tag_encode(27)
5428     encoding = "iso-8859-1"
5429     asn1_type_name = "GeneralString"
5430
5431
5432 class UniversalString(CommonString):
5433     __slots__ = ()
5434     tag_default = tag_encode(28)
5435     encoding = "utf-32-be"
5436     asn1_type_name = "UniversalString"
5437
5438
5439 class BMPString(CommonString):
5440     __slots__ = ()
5441     tag_default = tag_encode(30)
5442     encoding = "utf-16-be"
5443     asn1_type_name = "BMPString"
5444
5445
5446 ChoiceState = namedtuple(
5447     "ChoiceState",
5448     BasicState._fields + ("specs", "value",),
5449     **NAMEDTUPLE_KWARGS
5450 )
5451
5452
5453 class Choice(Obj):
5454     """``CHOICE`` special type
5455
5456     ::
5457
5458         class GeneralName(Choice):
5459             schema = (
5460                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5461                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5462             )
5463
5464     >>> gn = GeneralName()
5465     GeneralName CHOICE
5466     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5467     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5468     >>> gn["dNSName"] = IA5String("bar.baz")
5469     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5470     >>> gn["rfc822Name"]
5471     None
5472     >>> gn["dNSName"]
5473     [2] IA5String IA5 bar.baz
5474     >>> gn.choice
5475     'dNSName'
5476     >>> gn.value == gn["dNSName"]
5477     True
5478     >>> gn.specs
5479     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5480
5481     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5482     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5483     """
5484     __slots__ = ("specs",)
5485     tag_default = None
5486     asn1_type_name = "CHOICE"
5487
5488     def __init__(
5489             self,
5490             value=None,
5491             schema=None,
5492             impl=None,
5493             expl=None,
5494             default=None,
5495             optional=False,
5496             _decoded=(0, 0, 0),
5497     ):
5498         """
5499         :param value: set the value. Either ``(choice, value)`` tuple, or
5500                       :py:class:`pyderasn.Choice` object
5501         :param bytes impl: can not be set, do **not** use it
5502         :param bytes expl: override default tag with ``EXPLICIT`` one
5503         :param default: set default value. Type same as in ``value``
5504         :param bool optional: is object ``OPTIONAL`` in sequence
5505         """
5506         if impl is not None:
5507             raise ValueError("no implicit tag allowed for CHOICE")
5508         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5509         if schema is None:
5510             schema = getattr(self, "schema", ())
5511         if len(schema) == 0:
5512             raise ValueError("schema must be specified")
5513         self.specs = (
5514             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5515         )
5516         self._value = None
5517         if value is not None:
5518             self._value = self._value_sanitize(value)
5519         if default is not None:
5520             default_value = self._value_sanitize(default)
5521             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5522             default_obj.specs = self.specs
5523             default_obj._value = default_value
5524             self.default = default_obj
5525             if value is None:
5526                 self._value = copy(default_obj._value)
5527         if self._expl is not None:
5528             tag_class, _, tag_num = tag_decode(self._expl)
5529             self._tag_order = (tag_class, tag_num)
5530
5531     def _value_sanitize(self, value):
5532         if (value.__class__ == tuple) and len(value) == 2:
5533             choice, obj = value
5534             spec = self.specs.get(choice)
5535             if spec is None:
5536                 raise ObjUnknown(choice)
5537             if not isinstance(obj, spec.__class__):
5538                 raise InvalidValueType((spec,))
5539             return (choice, spec(obj))
5540         if isinstance(value, self.__class__):
5541             return value._value
5542         raise InvalidValueType((self.__class__, tuple))
5543
5544     @property
5545     def ready(self):
5546         return self._value is not None and self._value[1].ready
5547
5548     @property
5549     def bered(self):
5550         return self.expl_lenindef or (
5551             (self._value is not None) and
5552             self._value[1].bered
5553         )
5554
5555     def __getstate__(self):
5556         return ChoiceState(
5557             __version__,
5558             self.tag,
5559             self._tag_order,
5560             self._expl,
5561             self.default,
5562             self.optional,
5563             self.offset,
5564             self.llen,
5565             self.vlen,
5566             self.expl_lenindef,
5567             self.lenindef,
5568             self.ber_encoded,
5569             self.specs,
5570             copy(self._value),
5571         )
5572
5573     def __setstate__(self, state):
5574         super(Choice, self).__setstate__(state)
5575         self.specs = state.specs
5576         self._value = state.value
5577
5578     def __eq__(self, their):
5579         if (their.__class__ == tuple) and len(their) == 2:
5580             return self._value == their
5581         if not isinstance(their, self.__class__):
5582             return False
5583         return (
5584             self.specs == their.specs and
5585             self._value == their._value
5586         )
5587
5588     def __call__(
5589             self,
5590             value=None,
5591             expl=None,
5592             default=None,
5593             optional=None,
5594     ):
5595         return self.__class__(
5596             value=value,
5597             schema=self.specs,
5598             expl=self._expl if expl is None else expl,
5599             default=self.default if default is None else default,
5600             optional=self.optional if optional is None else optional,
5601         )
5602
5603     @property
5604     def choice(self):
5605         """Name of the choice
5606         """
5607         self._assert_ready()
5608         return self._value[0]
5609
5610     @property
5611     def value(self):
5612         """Value of underlying choice
5613         """
5614         self._assert_ready()
5615         return self._value[1]
5616
5617     @property
5618     def tag_order(self):
5619         self._assert_ready()
5620         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5621
5622     @property
5623     def tag_order_cer(self):
5624         return min(v.tag_order_cer for v in itervalues(self.specs))
5625
5626     def __getitem__(self, key):
5627         if key not in self.specs:
5628             raise ObjUnknown(key)
5629         if self._value is None:
5630             return None
5631         choice, value = self._value
5632         if choice != key:
5633             return None
5634         return value
5635
5636     def __setitem__(self, key, value):
5637         spec = self.specs.get(key)
5638         if spec is None:
5639             raise ObjUnknown(key)
5640         if not isinstance(value, spec.__class__):
5641             raise InvalidValueType((spec.__class__,))
5642         self._value = (key, spec(value))
5643
5644     @property
5645     def tlen(self):
5646         return 0
5647
5648     @property
5649     def decoded(self):
5650         return self._value[1].decoded if self.ready else False
5651
5652     def _encode(self):
5653         self._assert_ready()
5654         return self._value[1].encode()
5655
5656     def _encode1st(self, state):
5657         self._assert_ready()
5658         return self._value[1].encode1st(state)
5659
5660     def _encode2nd(self, writer, state_iter):
5661         self._value[1].encode2nd(writer, state_iter)
5662
5663     def _encode_cer(self, writer):
5664         self._assert_ready()
5665         self._value[1].encode_cer(writer)
5666
5667     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5668         for choice, spec in iteritems(self.specs):
5669             sub_decode_path = decode_path + (choice,)
5670             try:
5671                 spec.decode(
5672                     tlv,
5673                     offset=offset,
5674                     leavemm=True,
5675                     decode_path=sub_decode_path,
5676                     ctx=ctx,
5677                     tag_only=True,
5678                     _ctx_immutable=False,
5679                 )
5680             except TagMismatch:
5681                 continue
5682             break
5683         else:
5684             raise TagMismatch(
5685                 klass=self.__class__,
5686                 decode_path=decode_path,
5687                 offset=offset,
5688             )
5689         if tag_only:  # pragma: no cover
5690             yield None
5691             return
5692         if evgen_mode:
5693             for _decode_path, value, tail in spec.decode_evgen(
5694                     tlv,
5695                     offset=offset,
5696                     leavemm=True,
5697                     decode_path=sub_decode_path,
5698                     ctx=ctx,
5699                     _ctx_immutable=False,
5700             ):
5701                 yield _decode_path, value, tail
5702         else:
5703             _, value, tail = next(spec.decode_evgen(
5704                 tlv,
5705                 offset=offset,
5706                 leavemm=True,
5707                 decode_path=sub_decode_path,
5708                 ctx=ctx,
5709                 _ctx_immutable=False,
5710                 _evgen_mode=False,
5711             ))
5712         obj = self.__class__(
5713             schema=self.specs,
5714             expl=self._expl,
5715             default=self.default,
5716             optional=self.optional,
5717             _decoded=(offset, 0, value.fulllen),
5718         )
5719         obj._value = (choice, value)
5720         yield decode_path, obj, tail
5721
5722     def __repr__(self):
5723         value = pp_console_row(next(self.pps()))
5724         if self.ready:
5725             value = "%s[%r]" % (value, self.value)
5726         return value
5727
5728     def pps(self, decode_path=()):
5729         yield _pp(
5730             obj=self,
5731             asn1_type_name=self.asn1_type_name,
5732             obj_name=self.__class__.__name__,
5733             decode_path=decode_path,
5734             value=self.choice if self.ready else None,
5735             optional=self.optional,
5736             default=self == self.default,
5737             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5738             expl=None if self._expl is None else tag_decode(self._expl),
5739             offset=self.offset,
5740             tlen=self.tlen,
5741             llen=self.llen,
5742             vlen=self.vlen,
5743             expl_lenindef=self.expl_lenindef,
5744             bered=self.bered,
5745         )
5746         if self.ready:
5747             yield self.value.pps(decode_path=decode_path + (self.choice,))
5748         for pp in self.pps_lenindef(decode_path):
5749             yield pp
5750
5751
5752 class PrimitiveTypes(Choice):
5753     """Predefined ``CHOICE`` for all generic primitive types
5754
5755     It could be useful for general decoding of some unspecified values:
5756
5757     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5758     OCTET STRING 3 bytes 666f6f
5759     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5760     INTEGER 1193046
5761     """
5762     __slots__ = ()
5763     schema = tuple((klass.__name__, klass()) for klass in (
5764         Boolean,
5765         Integer,
5766         BitString,
5767         OctetString,
5768         Null,
5769         ObjectIdentifier,
5770         UTF8String,
5771         NumericString,
5772         PrintableString,
5773         TeletexString,
5774         VideotexString,
5775         IA5String,
5776         UTCTime,
5777         GeneralizedTime,
5778         GraphicString,
5779         VisibleString,
5780         ISO646String,
5781         GeneralString,
5782         UniversalString,
5783         BMPString,
5784     ))
5785
5786
5787 AnyState = namedtuple(
5788     "AnyState",
5789     BasicState._fields + ("value", "defined"),
5790     **NAMEDTUPLE_KWARGS
5791 )
5792
5793
5794 class Any(Obj):
5795     """``ANY`` special type
5796
5797     >>> Any(Integer(-123))
5798     ANY INTEGER -123 (0X:7B)
5799     >>> a = Any(OctetString(b"hello world").encode())
5800     ANY 040b68656c6c6f20776f726c64
5801     >>> hexenc(bytes(a))
5802     b'0x040x0bhello world'
5803     """
5804     __slots__ = ("defined",)
5805     tag_default = tag_encode(0)
5806     asn1_type_name = "ANY"
5807
5808     def __init__(
5809             self,
5810             value=None,
5811             expl=None,
5812             optional=False,
5813             _decoded=(0, 0, 0),
5814     ):
5815         """
5816         :param value: set the value. Either any kind of pyderasn's
5817                       **ready** object, or bytes. Pay attention that
5818                       **no** validation is performed if raw binary value
5819                       is valid TLV, except just tag decoding
5820         :param bytes expl: override default tag with ``EXPLICIT`` one
5821         :param bool optional: is object ``OPTIONAL`` in sequence
5822         """
5823         super(Any, self).__init__(None, expl, None, optional, _decoded)
5824         if value is None:
5825             self._value = None
5826         else:
5827             value = self._value_sanitize(value)
5828             self._value = value
5829             if self._expl is None:
5830                 if value.__class__ == binary_type:
5831                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5832                 else:
5833                     tag_class, tag_num = value.tag_order
5834             else:
5835                 tag_class, _, tag_num = tag_decode(self._expl)
5836             self._tag_order = (tag_class, tag_num)
5837         self.defined = None
5838
5839     def _value_sanitize(self, value):
5840         if value.__class__ == binary_type:
5841             if len(value) == 0:
5842                 raise ValueError("Any value can not be empty")
5843             return value
5844         if isinstance(value, self.__class__):
5845             return value._value
5846         if not isinstance(value, Obj):
5847             raise InvalidValueType((self.__class__, Obj, binary_type))
5848         return value
5849
5850     @property
5851     def ready(self):
5852         return self._value is not None
5853
5854     @property
5855     def tag_order(self):
5856         self._assert_ready()
5857         return self._tag_order
5858
5859     @property
5860     def bered(self):
5861         if self.expl_lenindef or self.lenindef:
5862             return True
5863         if self.defined is None:
5864             return False
5865         return self.defined[1].bered
5866
5867     def __getstate__(self):
5868         return AnyState(
5869             __version__,
5870             self.tag,
5871             self._tag_order,
5872             self._expl,
5873             None,
5874             self.optional,
5875             self.offset,
5876             self.llen,
5877             self.vlen,
5878             self.expl_lenindef,
5879             self.lenindef,
5880             self.ber_encoded,
5881             self._value,
5882             self.defined,
5883         )
5884
5885     def __setstate__(self, state):
5886         super(Any, self).__setstate__(state)
5887         self._value = state.value
5888         self.defined = state.defined
5889
5890     def __eq__(self, their):
5891         if their.__class__ == binary_type:
5892             if self._value.__class__ == binary_type:
5893                 return self._value == their
5894             return self._value.encode() == their
5895         if issubclass(their.__class__, Any):
5896             if self.ready and their.ready:
5897                 return bytes(self) == bytes(their)
5898             return self.ready == their.ready
5899         return False
5900
5901     def __call__(
5902             self,
5903             value=None,
5904             expl=None,
5905             optional=None,
5906     ):
5907         return self.__class__(
5908             value=value,
5909             expl=self._expl if expl is None else expl,
5910             optional=self.optional if optional is None else optional,
5911         )
5912
5913     def __bytes__(self):
5914         self._assert_ready()
5915         value = self._value
5916         if value.__class__ == binary_type:
5917             return value
5918         return self._value.encode()
5919
5920     @property
5921     def tlen(self):
5922         return 0
5923
5924     def _encode(self):
5925         self._assert_ready()
5926         value = self._value
5927         if value.__class__ == binary_type:
5928             return value
5929         return value.encode()
5930
5931     def _encode1st(self, state):
5932         self._assert_ready()
5933         value = self._value
5934         if value.__class__ == binary_type:
5935             return len(value), state
5936         return value.encode1st(state)
5937
5938     def _encode2nd(self, writer, state_iter):
5939         value = self._value
5940         if value.__class__ == binary_type:
5941             write_full(writer, value)
5942         else:
5943             value.encode2nd(writer, state_iter)
5944
5945     def _encode_cer(self, writer):
5946         self._assert_ready()
5947         value = self._value
5948         if value.__class__ == binary_type:
5949             write_full(writer, value)
5950         else:
5951             value.encode_cer(writer)
5952
5953     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5954         try:
5955             t, tlen, lv = tag_strip(tlv)
5956         except DecodeError as err:
5957             raise err.__class__(
5958                 msg=err.msg,
5959                 klass=self.__class__,
5960                 decode_path=decode_path,
5961                 offset=offset,
5962             )
5963         try:
5964             l, llen, v = len_decode(lv)
5965         except LenIndefForm as err:
5966             if not ctx.get("bered", False):
5967                 raise err.__class__(
5968                     msg=err.msg,
5969                     klass=self.__class__,
5970                     decode_path=decode_path,
5971                     offset=offset,
5972                 )
5973             llen, vlen, v = 1, 0, lv[1:]
5974             sub_offset = offset + tlen + llen
5975             chunk_i = 0
5976             while v[:EOC_LEN].tobytes() != EOC:
5977                 chunk, v = Any().decode(
5978                     v,
5979                     offset=sub_offset,
5980                     decode_path=decode_path + (str(chunk_i),),
5981                     leavemm=True,
5982                     ctx=ctx,
5983                     _ctx_immutable=False,
5984                 )
5985                 vlen += chunk.tlvlen
5986                 sub_offset += chunk.tlvlen
5987                 chunk_i += 1
5988             tlvlen = tlen + llen + vlen + EOC_LEN
5989             obj = self.__class__(
5990                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5991                 expl=self._expl,
5992                 optional=self.optional,
5993                 _decoded=(offset, 0, tlvlen),
5994             )
5995             obj.lenindef = True
5996             obj.tag = t.tobytes()
5997             yield decode_path, obj, v[EOC_LEN:]
5998             return
5999         except DecodeError as err:
6000             raise err.__class__(
6001                 msg=err.msg,
6002                 klass=self.__class__,
6003                 decode_path=decode_path,
6004                 offset=offset,
6005             )
6006         if l > len(v):
6007             raise NotEnoughData(
6008                 "encoded length is longer than data",
6009                 klass=self.__class__,
6010                 decode_path=decode_path,
6011                 offset=offset,
6012             )
6013         tlvlen = tlen + llen + l
6014         v, tail = tlv[:tlvlen], v[l:]
6015         obj = self.__class__(
6016             value=None if evgen_mode else v.tobytes(),
6017             expl=self._expl,
6018             optional=self.optional,
6019             _decoded=(offset, 0, tlvlen),
6020         )
6021         obj.tag = t.tobytes()
6022         yield decode_path, obj, tail
6023
6024     def __repr__(self):
6025         return pp_console_row(next(self.pps()))
6026
6027     def pps(self, decode_path=()):
6028         value = self._value
6029         if value is None:
6030             pass
6031         elif value.__class__ == binary_type:
6032             value = None
6033         else:
6034             value = repr(value)
6035         yield _pp(
6036             obj=self,
6037             asn1_type_name=self.asn1_type_name,
6038             obj_name=self.__class__.__name__,
6039             decode_path=decode_path,
6040             value=value,
6041             blob=self._value if self._value.__class__ == binary_type else None,
6042             optional=self.optional,
6043             default=self == self.default,
6044             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6045             expl=None if self._expl is None else tag_decode(self._expl),
6046             offset=self.offset,
6047             tlen=self.tlen,
6048             llen=self.llen,
6049             vlen=self.vlen,
6050             expl_offset=self.expl_offset if self.expled else None,
6051             expl_tlen=self.expl_tlen if self.expled else None,
6052             expl_llen=self.expl_llen if self.expled else None,
6053             expl_vlen=self.expl_vlen if self.expled else None,
6054             expl_lenindef=self.expl_lenindef,
6055             lenindef=self.lenindef,
6056             bered=self.bered,
6057         )
6058         defined_by, defined = self.defined or (None, None)
6059         if defined_by is not None:
6060             yield defined.pps(
6061                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6062             )
6063         for pp in self.pps_lenindef(decode_path):
6064             yield pp
6065
6066
6067 ########################################################################
6068 # ASN.1 constructed types
6069 ########################################################################
6070
6071 def abs_decode_path(decode_path, rel_path):
6072     """Create an absolute decode path from current and relative ones
6073
6074     :param decode_path: current decode path, starting point. Tuple of strings
6075     :param rel_path: relative path to ``decode_path``. Tuple of strings.
6076                      If first tuple's element is "/", then treat it as
6077                      an absolute path, ignoring ``decode_path`` as
6078                      starting point. Also this tuple can contain ".."
6079                      elements, stripping the leading element from
6080                      ``decode_path``
6081
6082     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6083     ("foo", "bar", "baz", "whatever")
6084     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6085     ("foo", "whatever")
6086     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6087     ("baz", "whatever")
6088     """
6089     if rel_path[0] == "/":
6090         return rel_path[1:]
6091     if rel_path[0] == "..":
6092         return abs_decode_path(decode_path[:-1], rel_path[1:])
6093     return decode_path + rel_path
6094
6095
6096 SequenceState = namedtuple(
6097     "SequenceState",
6098     BasicState._fields + ("specs", "value",),
6099     **NAMEDTUPLE_KWARGS
6100 )
6101
6102
6103 class SequenceEncode1stMixing(object):
6104     def _encode1st(self, state):
6105         state.append(0)
6106         idx = len(state) - 1
6107         vlen = 0
6108         for v in self._values_for_encoding():
6109             l, _ = v.encode1st(state)
6110             vlen += l
6111         state[idx] = vlen
6112         return len(self.tag) + len_size(vlen) + vlen, state
6113
6114
6115 class Sequence(SequenceEncode1stMixing, Obj):
6116     """``SEQUENCE`` structure type
6117
6118     You have to make specification of sequence::
6119
6120         class Extension(Sequence):
6121             schema = (
6122                 ("extnID", ObjectIdentifier()),
6123                 ("critical", Boolean(default=False)),
6124                 ("extnValue", OctetString()),
6125             )
6126
6127     Then, you can work with it as with dictionary.
6128
6129     >>> ext = Extension()
6130     >>> Extension().specs
6131     OrderedDict([
6132         ('extnID', OBJECT IDENTIFIER),
6133         ('critical', BOOLEAN False OPTIONAL DEFAULT),
6134         ('extnValue', OCTET STRING),
6135     ])
6136     >>> ext["extnID"] = "1.2.3"
6137     Traceback (most recent call last):
6138     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6139     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6140
6141     You can determine if sequence is ready to be encoded:
6142
6143     >>> ext.ready
6144     False
6145     >>> ext.encode()
6146     Traceback (most recent call last):
6147     pyderasn.ObjNotReady: object is not ready: extnValue
6148     >>> ext["extnValue"] = OctetString(b"foobar")
6149     >>> ext.ready
6150     True
6151
6152     Value you want to assign, must have the same **type** as in
6153     corresponding specification, but it can have different tags,
6154     optional/default attributes -- they will be taken from specification
6155     automatically::
6156
6157         class TBSCertificate(Sequence):
6158             schema = (
6159                 ("version", Version(expl=tag_ctxc(0), default="v1")),
6160             [...]
6161
6162     >>> tbs = TBSCertificate()
6163     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6164
6165     Assign ``None`` to remove value from sequence.
6166
6167     You can set values in Sequence during its initialization:
6168
6169     >>> AlgorithmIdentifier((
6170         ("algorithm", ObjectIdentifier("1.2.3")),
6171         ("parameters", Any(Null()))
6172     ))
6173     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6174
6175     You can determine if value exists/set in the sequence and take its value:
6176
6177     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6178     (True, True, False)
6179     >>> ext["extnID"]
6180     OBJECT IDENTIFIER 1.2.3
6181
6182     But pay attention that if value has default, then it won't be (not
6183     in) in the sequence (because ``DEFAULT`` must not be encoded in
6184     DER), but you can read its value:
6185
6186     >>> "critical" in ext, ext["critical"]
6187     (False, BOOLEAN False)
6188     >>> ext["critical"] = Boolean(True)
6189     >>> "critical" in ext, ext["critical"]
6190     (True, BOOLEAN True)
6191
6192     All defaulted values are always optional.
6193
6194     .. _allow_default_values_ctx:
6195
6196     DER prohibits default value encoding and will raise an error if
6197     default value is unexpectedly met during decode.
6198     If :ref:`bered <bered_ctx>` context option is set, then no error
6199     will be raised, but ``bered`` attribute set. You can disable strict
6200     defaulted values existence validation by setting
6201     ``"allow_default_values": True`` :ref:`context <ctx>` option.
6202
6203     All values with DEFAULT specified are decoded atomically in
6204     :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6205     SEQUENCE, then it will be yielded as a single element, not
6206     disassembled. That is required for DEFAULT existence check.
6207
6208     Two sequences are equal if they have equal specification (schema),
6209     implicit/explicit tagging and the same values.
6210     """
6211     __slots__ = ("specs",)
6212     tag_default = tag_encode(form=TagFormConstructed, num=16)
6213     asn1_type_name = "SEQUENCE"
6214
6215     def __init__(
6216             self,
6217             value=None,
6218             schema=None,
6219             impl=None,
6220             expl=None,
6221             default=None,
6222             optional=False,
6223             _decoded=(0, 0, 0),
6224     ):
6225         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6226         if schema is None:
6227             schema = getattr(self, "schema", ())
6228         self.specs = (
6229             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6230         )
6231         self._value = {}
6232         if value is not None:
6233             if issubclass(value.__class__, Sequence):
6234                 self._value = value._value
6235             elif hasattr(value, "__iter__"):
6236                 for seq_key, seq_value in value:
6237                     self[seq_key] = seq_value
6238             else:
6239                 raise InvalidValueType((Sequence,))
6240         if default is not None:
6241             if not issubclass(default.__class__, Sequence):
6242                 raise InvalidValueType((Sequence,))
6243             default_value = default._value
6244             default_obj = self.__class__(impl=self.tag, expl=self._expl)
6245             default_obj.specs = self.specs
6246             default_obj._value = default_value
6247             self.default = default_obj
6248             if value is None:
6249                 self._value = copy(default_obj._value)
6250
6251     @property
6252     def ready(self):
6253         for name, spec in iteritems(self.specs):
6254             value = self._value.get(name)
6255             if value is None:
6256                 if spec.optional:
6257                     continue
6258                 return False
6259             if not value.ready:
6260                 return False
6261         return True
6262
6263     @property
6264     def bered(self):
6265         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6266             return True
6267         return any(value.bered for value in itervalues(self._value))
6268
6269     def __getstate__(self):
6270         return SequenceState(
6271             __version__,
6272             self.tag,
6273             self._tag_order,
6274             self._expl,
6275             self.default,
6276             self.optional,
6277             self.offset,
6278             self.llen,
6279             self.vlen,
6280             self.expl_lenindef,
6281             self.lenindef,
6282             self.ber_encoded,
6283             self.specs,
6284             {k: copy(v) for k, v in iteritems(self._value)},
6285         )
6286
6287     def __setstate__(self, state):
6288         super(Sequence, self).__setstate__(state)
6289         self.specs = state.specs
6290         self._value = state.value
6291
6292     def __eq__(self, their):
6293         if not isinstance(their, self.__class__):
6294             return False
6295         return (
6296             self.specs == their.specs and
6297             self.tag == their.tag and
6298             self._expl == their._expl and
6299             self._value == their._value
6300         )
6301
6302     def __call__(
6303             self,
6304             value=None,
6305             impl=None,
6306             expl=None,
6307             default=None,
6308             optional=None,
6309     ):
6310         return self.__class__(
6311             value=value,
6312             schema=self.specs,
6313             impl=self.tag if impl is None else impl,
6314             expl=self._expl if expl is None else expl,
6315             default=self.default if default is None else default,
6316             optional=self.optional if optional is None else optional,
6317         )
6318
6319     def __contains__(self, key):
6320         return key in self._value
6321
6322     def __setitem__(self, key, value):
6323         spec = self.specs.get(key)
6324         if spec is None:
6325             raise ObjUnknown(key)
6326         if value is None:
6327             self._value.pop(key, None)
6328             return
6329         if not isinstance(value, spec.__class__):
6330             raise InvalidValueType((spec.__class__,))
6331         value = spec(value=value)
6332         if spec.default is not None and value == spec.default:
6333             self._value.pop(key, None)
6334             return
6335         self._value[key] = value
6336
6337     def __getitem__(self, key):
6338         value = self._value.get(key)
6339         if value is not None:
6340             return value
6341         spec = self.specs.get(key)
6342         if spec is None:
6343             raise ObjUnknown(key)
6344         if spec.default is not None:
6345             return spec.default
6346         return None
6347
6348     def _values_for_encoding(self):
6349         for name, spec in iteritems(self.specs):
6350             value = self._value.get(name)
6351             if value is None:
6352                 if spec.optional:
6353                     continue
6354                 raise ObjNotReady(name)
6355             yield value
6356
6357     def _encode(self):
6358         v = b"".join(v.encode() for v in self._values_for_encoding())
6359         return b"".join((self.tag, len_encode(len(v)), v))
6360
6361     def _encode2nd(self, writer, state_iter):
6362         write_full(writer, self.tag + len_encode(next(state_iter)))
6363         for v in self._values_for_encoding():
6364             v.encode2nd(writer, state_iter)
6365
6366     def _encode_cer(self, writer):
6367         write_full(writer, self.tag + LENINDEF)
6368         for v in self._values_for_encoding():
6369             v.encode_cer(writer)
6370         write_full(writer, EOC)
6371
6372     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6373         try:
6374             t, tlen, lv = tag_strip(tlv)
6375         except DecodeError as err:
6376             raise err.__class__(
6377                 msg=err.msg,
6378                 klass=self.__class__,
6379                 decode_path=decode_path,
6380                 offset=offset,
6381             )
6382         if t != self.tag:
6383             raise TagMismatch(
6384                 klass=self.__class__,
6385                 decode_path=decode_path,
6386                 offset=offset,
6387             )
6388         if tag_only:  # pragma: no cover
6389             yield None
6390             return
6391         lenindef = False
6392         ctx_bered = ctx.get("bered", False)
6393         try:
6394             l, llen, v = len_decode(lv)
6395         except LenIndefForm as err:
6396             if not ctx_bered:
6397                 raise err.__class__(
6398                     msg=err.msg,
6399                     klass=self.__class__,
6400                     decode_path=decode_path,
6401                     offset=offset,
6402                 )
6403             l, llen, v = 0, 1, lv[1:]
6404             lenindef = True
6405         except DecodeError as err:
6406             raise err.__class__(
6407                 msg=err.msg,
6408                 klass=self.__class__,
6409                 decode_path=decode_path,
6410                 offset=offset,
6411             )
6412         if l > len(v):
6413             raise NotEnoughData(
6414                 "encoded length is longer than data",
6415                 klass=self.__class__,
6416                 decode_path=decode_path,
6417                 offset=offset,
6418             )
6419         if not lenindef:
6420             v, tail = v[:l], v[l:]
6421         vlen = 0
6422         sub_offset = offset + tlen + llen
6423         values = {}
6424         ber_encoded = False
6425         ctx_allow_default_values = ctx.get("allow_default_values", False)
6426         for name, spec in iteritems(self.specs):
6427             if spec.optional and (
6428                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6429                     len(v) == 0
6430             ):
6431                 continue
6432             spec_defaulted = spec.default is not None
6433             sub_decode_path = decode_path + (name,)
6434             try:
6435                 if evgen_mode and not spec_defaulted:
6436                     for _decode_path, value, v_tail in spec.decode_evgen(
6437                             v,
6438                             sub_offset,
6439                             leavemm=True,
6440                             decode_path=sub_decode_path,
6441                             ctx=ctx,
6442                             _ctx_immutable=False,
6443                     ):
6444                         yield _decode_path, value, v_tail
6445                 else:
6446                     _, value, v_tail = next(spec.decode_evgen(
6447                         v,
6448                         sub_offset,
6449                         leavemm=True,
6450                         decode_path=sub_decode_path,
6451                         ctx=ctx,
6452                         _ctx_immutable=False,
6453                         _evgen_mode=False,
6454                     ))
6455             except TagMismatch as err:
6456                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6457                     continue
6458                 raise
6459
6460             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6461             if not evgen_mode and defined is not None:
6462                 defined_by, defined_spec = defined
6463                 if issubclass(value.__class__, SequenceOf):
6464                     for i, _value in enumerate(value):
6465                         sub_sub_decode_path = sub_decode_path + (
6466                             str(i),
6467                             DecodePathDefBy(defined_by),
6468                         )
6469                         defined_value, defined_tail = defined_spec.decode(
6470                             memoryview(bytes(_value)),
6471                             sub_offset + (
6472                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6473                                 if value.expled else (value.tlen + value.llen)
6474                             ),
6475                             leavemm=True,
6476                             decode_path=sub_sub_decode_path,
6477                             ctx=ctx,
6478                             _ctx_immutable=False,
6479                         )
6480                         if len(defined_tail) > 0:
6481                             raise DecodeError(
6482                                 "remaining data",
6483                                 klass=self.__class__,
6484                                 decode_path=sub_sub_decode_path,
6485                                 offset=offset,
6486                             )
6487                         _value.defined = (defined_by, defined_value)
6488                 else:
6489                     defined_value, defined_tail = defined_spec.decode(
6490                         memoryview(bytes(value)),
6491                         sub_offset + (
6492                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6493                             if value.expled else (value.tlen + value.llen)
6494                         ),
6495                         leavemm=True,
6496                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6497                         ctx=ctx,
6498                         _ctx_immutable=False,
6499                     )
6500                     if len(defined_tail) > 0:
6501                         raise DecodeError(
6502                             "remaining data",
6503                             klass=self.__class__,
6504                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6505                             offset=offset,
6506                         )
6507                     value.defined = (defined_by, defined_value)
6508
6509             value_len = value.fulllen
6510             vlen += value_len
6511             sub_offset += value_len
6512             v = v_tail
6513             if spec_defaulted:
6514                 if evgen_mode:
6515                     yield sub_decode_path, value, v_tail
6516                 if value == spec.default:
6517                     if ctx_bered or ctx_allow_default_values:
6518                         ber_encoded = True
6519                     else:
6520                         raise DecodeError(
6521                             "DEFAULT value met",
6522                             klass=self.__class__,
6523                             decode_path=sub_decode_path,
6524                             offset=sub_offset,
6525                         )
6526             if not evgen_mode:
6527                 values[name] = value
6528                 spec_defines = getattr(spec, "defines", ())
6529                 if len(spec_defines) == 0:
6530                     defines_by_path = ctx.get("defines_by_path", ())
6531                     if len(defines_by_path) > 0:
6532                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6533                 if spec_defines is not None and len(spec_defines) > 0:
6534                     for rel_path, schema in spec_defines:
6535                         defined = schema.get(value, None)
6536                         if defined is not None:
6537                             ctx.setdefault("_defines", []).append((
6538                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6539                                 (value, defined),
6540                             ))
6541         if lenindef:
6542             if v[:EOC_LEN].tobytes() != EOC:
6543                 raise DecodeError(
6544                     "no EOC",
6545                     klass=self.__class__,
6546                     decode_path=decode_path,
6547                     offset=offset,
6548                 )
6549             tail = v[EOC_LEN:]
6550             vlen += EOC_LEN
6551         elif len(v) > 0:
6552             raise DecodeError(
6553                 "remaining data",
6554                 klass=self.__class__,
6555                 decode_path=decode_path,
6556                 offset=offset,
6557             )
6558         obj = self.__class__(
6559             schema=self.specs,
6560             impl=self.tag,
6561             expl=self._expl,
6562             default=self.default,
6563             optional=self.optional,
6564             _decoded=(offset, llen, vlen),
6565         )
6566         obj._value = values
6567         obj.lenindef = lenindef
6568         obj.ber_encoded = ber_encoded
6569         yield decode_path, obj, tail
6570
6571     def __repr__(self):
6572         value = pp_console_row(next(self.pps()))
6573         cols = []
6574         for name in self.specs:
6575             _value = self._value.get(name)
6576             if _value is None:
6577                 continue
6578             cols.append("%s: %s" % (name, repr(_value)))
6579         return "%s[%s]" % (value, "; ".join(cols))
6580
6581     def pps(self, decode_path=()):
6582         yield _pp(
6583             obj=self,
6584             asn1_type_name=self.asn1_type_name,
6585             obj_name=self.__class__.__name__,
6586             decode_path=decode_path,
6587             optional=self.optional,
6588             default=self == self.default,
6589             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6590             expl=None if self._expl is None else tag_decode(self._expl),
6591             offset=self.offset,
6592             tlen=self.tlen,
6593             llen=self.llen,
6594             vlen=self.vlen,
6595             expl_offset=self.expl_offset if self.expled else None,
6596             expl_tlen=self.expl_tlen if self.expled else None,
6597             expl_llen=self.expl_llen if self.expled else None,
6598             expl_vlen=self.expl_vlen if self.expled else None,
6599             expl_lenindef=self.expl_lenindef,
6600             lenindef=self.lenindef,
6601             ber_encoded=self.ber_encoded,
6602             bered=self.bered,
6603         )
6604         for name in self.specs:
6605             value = self._value.get(name)
6606             if value is None:
6607                 continue
6608             yield value.pps(decode_path=decode_path + (name,))
6609         for pp in self.pps_lenindef(decode_path):
6610             yield pp
6611
6612
6613 class Set(Sequence, SequenceEncode1stMixing):
6614     """``SET`` structure type
6615
6616     Its usage is identical to :py:class:`pyderasn.Sequence`.
6617
6618     .. _allow_unordered_set_ctx:
6619
6620     DER prohibits unordered values encoding and will raise an error
6621     during decode. If :ref:`bered <bered_ctx>` context option is set,
6622     then no error will occur. Also you can disable strict values
6623     ordering check by setting ``"allow_unordered_set": True``
6624     :ref:`context <ctx>` option.
6625     """
6626     __slots__ = ()
6627     tag_default = tag_encode(form=TagFormConstructed, num=17)
6628     asn1_type_name = "SET"
6629
6630     def _values_for_encoding(self):
6631         return sorted(
6632             super(Set, self)._values_for_encoding(),
6633             key=attrgetter("tag_order"),
6634         )
6635
6636     def _encode_cer(self, writer):
6637         write_full(writer, self.tag + LENINDEF)
6638         for v in sorted(
6639                 super(Set, self)._values_for_encoding(),
6640                 key=attrgetter("tag_order_cer"),
6641         ):
6642             v.encode_cer(writer)
6643         write_full(writer, EOC)
6644
6645     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6646         try:
6647             t, tlen, lv = tag_strip(tlv)
6648         except DecodeError as err:
6649             raise err.__class__(
6650                 msg=err.msg,
6651                 klass=self.__class__,
6652                 decode_path=decode_path,
6653                 offset=offset,
6654             )
6655         if t != self.tag:
6656             raise TagMismatch(
6657                 klass=self.__class__,
6658                 decode_path=decode_path,
6659                 offset=offset,
6660             )
6661         if tag_only:
6662             yield None
6663             return
6664         lenindef = False
6665         ctx_bered = ctx.get("bered", False)
6666         try:
6667             l, llen, v = len_decode(lv)
6668         except LenIndefForm as err:
6669             if not ctx_bered:
6670                 raise err.__class__(
6671                     msg=err.msg,
6672                     klass=self.__class__,
6673                     decode_path=decode_path,
6674                     offset=offset,
6675                 )
6676             l, llen, v = 0, 1, lv[1:]
6677             lenindef = True
6678         except DecodeError as err:
6679             raise err.__class__(
6680                 msg=err.msg,
6681                 klass=self.__class__,
6682                 decode_path=decode_path,
6683                 offset=offset,
6684             )
6685         if l > len(v):
6686             raise NotEnoughData(
6687                 "encoded length is longer than data",
6688                 klass=self.__class__,
6689                 offset=offset,
6690             )
6691         if not lenindef:
6692             v, tail = v[:l], v[l:]
6693         vlen = 0
6694         sub_offset = offset + tlen + llen
6695         values = {}
6696         ber_encoded = False
6697         ctx_allow_default_values = ctx.get("allow_default_values", False)
6698         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6699         tag_order_prev = (0, 0)
6700         _specs_items = copy(self.specs)
6701
6702         while len(v) > 0:
6703             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6704                 break
6705             for name, spec in iteritems(_specs_items):
6706                 sub_decode_path = decode_path + (name,)
6707                 try:
6708                     spec.decode(
6709                         v,
6710                         sub_offset,
6711                         leavemm=True,
6712                         decode_path=sub_decode_path,
6713                         ctx=ctx,
6714                         tag_only=True,
6715                         _ctx_immutable=False,
6716                     )
6717                 except TagMismatch:
6718                     continue
6719                 break
6720             else:
6721                 raise TagMismatch(
6722                     klass=self.__class__,
6723                     decode_path=decode_path,
6724                     offset=offset,
6725                 )
6726             spec_defaulted = spec.default is not None
6727             if evgen_mode and not spec_defaulted:
6728                 for _decode_path, value, v_tail in spec.decode_evgen(
6729                         v,
6730                         sub_offset,
6731                         leavemm=True,
6732                         decode_path=sub_decode_path,
6733                         ctx=ctx,
6734                         _ctx_immutable=False,
6735                 ):
6736                     yield _decode_path, value, v_tail
6737             else:
6738                 _, value, v_tail = next(spec.decode_evgen(
6739                     v,
6740                     sub_offset,
6741                     leavemm=True,
6742                     decode_path=sub_decode_path,
6743                     ctx=ctx,
6744                     _ctx_immutable=False,
6745                     _evgen_mode=False,
6746                 ))
6747             value_tag_order = value.tag_order
6748             value_len = value.fulllen
6749             if tag_order_prev >= value_tag_order:
6750                 if ctx_bered or ctx_allow_unordered_set:
6751                     ber_encoded = True
6752                 else:
6753                     raise DecodeError(
6754                         "unordered " + self.asn1_type_name,
6755                         klass=self.__class__,
6756                         decode_path=sub_decode_path,
6757                         offset=sub_offset,
6758                     )
6759             if spec_defaulted:
6760                 if evgen_mode:
6761                     yield sub_decode_path, value, v_tail
6762                 if value != spec.default:
6763                     pass
6764                 elif ctx_bered or ctx_allow_default_values:
6765                     ber_encoded = True
6766                 else:
6767                     raise DecodeError(
6768                         "DEFAULT value met",
6769                         klass=self.__class__,
6770                         decode_path=sub_decode_path,
6771                         offset=sub_offset,
6772                     )
6773             values[name] = value
6774             del _specs_items[name]
6775             tag_order_prev = value_tag_order
6776             sub_offset += value_len
6777             vlen += value_len
6778             v = v_tail
6779
6780         obj = self.__class__(
6781             schema=self.specs,
6782             impl=self.tag,
6783             expl=self._expl,
6784             default=self.default,
6785             optional=self.optional,
6786             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6787         )
6788         if lenindef:
6789             if v[:EOC_LEN].tobytes() != EOC:
6790                 raise DecodeError(
6791                     "no EOC",
6792                     klass=self.__class__,
6793                     decode_path=decode_path,
6794                     offset=offset,
6795                 )
6796             tail = v[EOC_LEN:]
6797             obj.lenindef = True
6798         for name, spec in iteritems(self.specs):
6799             if name not in values and not spec.optional:
6800                 raise DecodeError(
6801                     "%s value is not ready" % name,
6802                     klass=self.__class__,
6803                     decode_path=decode_path,
6804                     offset=offset,
6805                 )
6806         if not evgen_mode:
6807             obj._value = values
6808         obj.ber_encoded = ber_encoded
6809         yield decode_path, obj, tail
6810
6811
6812 SequenceOfState = namedtuple(
6813     "SequenceOfState",
6814     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6815     **NAMEDTUPLE_KWARGS
6816 )
6817
6818
6819 class SequenceOf(SequenceEncode1stMixing, Obj):
6820     """``SEQUENCE OF`` sequence type
6821
6822     For that kind of type you must specify the object it will carry on
6823     (bounds are for example here, not required)::
6824
6825         class Ints(SequenceOf):
6826             schema = Integer()
6827             bounds = (0, 2)
6828
6829     >>> ints = Ints()
6830     >>> ints.append(Integer(123))
6831     >>> ints.append(Integer(234))
6832     >>> ints
6833     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6834     >>> [int(i) for i in ints]
6835     [123, 234]
6836     >>> ints.append(Integer(345))
6837     Traceback (most recent call last):
6838     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6839     >>> ints[1]
6840     INTEGER 234
6841     >>> ints[1] = Integer(345)
6842     >>> ints
6843     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6844
6845     You can initialize sequence with preinitialized values:
6846
6847     >>> ints = Ints([Integer(123), Integer(234)])
6848
6849     Also you can use iterator as a value:
6850
6851     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6852
6853     And it won't be iterated until encoding process. Pay attention that
6854     bounds and required schema checks are done only during the encoding
6855     process in that case! After encode was called, then value is zeroed
6856     back to empty list and you have to set it again. That mode is useful
6857     mainly with CER encoding mode, where all objects from the iterable
6858     will be streamed to the buffer, without copying all of them to
6859     memory first.
6860     """
6861     __slots__ = ("spec", "_bound_min", "_bound_max")
6862     tag_default = tag_encode(form=TagFormConstructed, num=16)
6863     asn1_type_name = "SEQUENCE OF"
6864
6865     def __init__(
6866             self,
6867             value=None,
6868             schema=None,
6869             bounds=None,
6870             impl=None,
6871             expl=None,
6872             default=None,
6873             optional=False,
6874             _decoded=(0, 0, 0),
6875     ):
6876         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6877         if schema is None:
6878             schema = getattr(self, "schema", None)
6879         if schema is None:
6880             raise ValueError("schema must be specified")
6881         self.spec = schema
6882         self._bound_min, self._bound_max = getattr(
6883             self,
6884             "bounds",
6885             (0, float("+inf")),
6886         ) if bounds is None else bounds
6887         self._value = []
6888         if value is not None:
6889             self._value = self._value_sanitize(value)
6890         if default is not None:
6891             default_value = self._value_sanitize(default)
6892             default_obj = self.__class__(
6893                 schema=schema,
6894                 impl=self.tag,
6895                 expl=self._expl,
6896             )
6897             default_obj._value = default_value
6898             self.default = default_obj
6899             if value is None:
6900                 self._value = copy(default_obj._value)
6901
6902     def _value_sanitize(self, value):
6903         iterator = False
6904         if issubclass(value.__class__, SequenceOf):
6905             value = value._value
6906         elif hasattr(value, NEXT_ATTR_NAME):
6907             iterator = True
6908         elif hasattr(value, "__iter__"):
6909             value = list(value)
6910         else:
6911             raise InvalidValueType((self.__class__, iter, "iterator"))
6912         if not iterator:
6913             if not self._bound_min <= len(value) <= self._bound_max:
6914                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6915             class_expected = self.spec.__class__
6916             for v in value:
6917                 if not isinstance(v, class_expected):
6918                     raise InvalidValueType((class_expected,))
6919         return value
6920
6921     @property
6922     def ready(self):
6923         if hasattr(self._value, NEXT_ATTR_NAME):
6924             return True
6925         if self._bound_min > 0 and len(self._value) == 0:
6926             return False
6927         return all(v.ready for v in self._value)
6928
6929     @property
6930     def bered(self):
6931         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6932             return True
6933         return any(v.bered for v in self._value)
6934
6935     def __getstate__(self):
6936         if hasattr(self._value, NEXT_ATTR_NAME):
6937             raise ValueError("can not pickle SequenceOf with iterator")
6938         return SequenceOfState(
6939             __version__,
6940             self.tag,
6941             self._tag_order,
6942             self._expl,
6943             self.default,
6944             self.optional,
6945             self.offset,
6946             self.llen,
6947             self.vlen,
6948             self.expl_lenindef,
6949             self.lenindef,
6950             self.ber_encoded,
6951             self.spec,
6952             [copy(v) for v in self._value],
6953             self._bound_min,
6954             self._bound_max,
6955         )
6956
6957     def __setstate__(self, state):
6958         super(SequenceOf, self).__setstate__(state)
6959         self.spec = state.spec
6960         self._value = state.value
6961         self._bound_min = state.bound_min
6962         self._bound_max = state.bound_max
6963
6964     def __eq__(self, their):
6965         if isinstance(their, self.__class__):
6966             return (
6967                 self.spec == their.spec and
6968                 self.tag == their.tag and
6969                 self._expl == their._expl and
6970                 self._value == their._value
6971             )
6972         if hasattr(their, "__iter__"):
6973             return self._value == list(their)
6974         return False
6975
6976     def __call__(
6977             self,
6978             value=None,
6979             bounds=None,
6980             impl=None,
6981             expl=None,
6982             default=None,
6983             optional=None,
6984     ):
6985         return self.__class__(
6986             value=value,
6987             schema=self.spec,
6988             bounds=(
6989                 (self._bound_min, self._bound_max)
6990                 if bounds is None else bounds
6991             ),
6992             impl=self.tag if impl is None else impl,
6993             expl=self._expl if expl is None else expl,
6994             default=self.default if default is None else default,
6995             optional=self.optional if optional is None else optional,
6996         )
6997
6998     def __contains__(self, key):
6999         return key in self._value
7000
7001     def append(self, value):
7002         if not isinstance(value, self.spec.__class__):
7003             raise InvalidValueType((self.spec.__class__,))
7004         if len(self._value) + 1 > self._bound_max:
7005             raise BoundsError(
7006                 self._bound_min,
7007                 len(self._value) + 1,
7008                 self._bound_max,
7009             )
7010         self._value.append(value)
7011
7012     def __iter__(self):
7013         return iter(self._value)
7014
7015     def __len__(self):
7016         return len(self._value)
7017
7018     def __setitem__(self, key, value):
7019         if not isinstance(value, self.spec.__class__):
7020             raise InvalidValueType((self.spec.__class__,))
7021         self._value[key] = self.spec(value=value)
7022
7023     def __getitem__(self, key):
7024         return self._value[key]
7025
7026     def _values_for_encoding(self):
7027         return iter(self._value)
7028
7029     def _encode(self):
7030         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7031         if iterator:
7032             values = []
7033             values_append = values.append
7034             class_expected = self.spec.__class__
7035             values_for_encoding = self._values_for_encoding()
7036             self._value = []
7037             for v in values_for_encoding:
7038                 if not isinstance(v, class_expected):
7039                     raise InvalidValueType((class_expected,))
7040                 values_append(v.encode())
7041             if not self._bound_min <= len(values) <= self._bound_max:
7042                 raise BoundsError(self._bound_min, len(values), self._bound_max)
7043             value = b"".join(values)
7044         else:
7045             value = b"".join(v.encode() for v in self._values_for_encoding())
7046         return b"".join((self.tag, len_encode(len(value)), value))
7047
7048     def _encode1st(self, state):
7049         state = super(SequenceOf, self)._encode1st(state)
7050         if hasattr(self._value, NEXT_ATTR_NAME):
7051             self._value = []
7052         return state
7053
7054     def _encode2nd(self, writer, state_iter):
7055         write_full(writer, self.tag + len_encode(next(state_iter)))
7056         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7057         if iterator:
7058             values_count = 0
7059             class_expected = self.spec.__class__
7060             values_for_encoding = self._values_for_encoding()
7061             self._value = []
7062             for v in values_for_encoding:
7063                 if not isinstance(v, class_expected):
7064                     raise InvalidValueType((class_expected,))
7065                 v.encode2nd(writer, state_iter)
7066                 values_count += 1
7067             if not self._bound_min <= values_count <= self._bound_max:
7068                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7069         else:
7070             for v in self._values_for_encoding():
7071                 v.encode2nd(writer, state_iter)
7072
7073     def _encode_cer(self, writer):
7074         write_full(writer, self.tag + LENINDEF)
7075         iterator = hasattr(self._value, NEXT_ATTR_NAME)
7076         if iterator:
7077             class_expected = self.spec.__class__
7078             values_count = 0
7079             values_for_encoding = self._values_for_encoding()
7080             self._value = []
7081             for v in values_for_encoding:
7082                 if not isinstance(v, class_expected):
7083                     raise InvalidValueType((class_expected,))
7084                 v.encode_cer(writer)
7085                 values_count += 1
7086             if not self._bound_min <= values_count <= self._bound_max:
7087                 raise BoundsError(self._bound_min, values_count, self._bound_max)
7088         else:
7089             for v in self._values_for_encoding():
7090                 v.encode_cer(writer)
7091         write_full(writer, EOC)
7092
7093     def _decode(
7094             self,
7095             tlv,
7096             offset,
7097             decode_path,
7098             ctx,
7099             tag_only,
7100             evgen_mode,
7101             ordering_check=False,
7102     ):
7103         try:
7104             t, tlen, lv = tag_strip(tlv)
7105         except DecodeError as err:
7106             raise err.__class__(
7107                 msg=err.msg,
7108                 klass=self.__class__,
7109                 decode_path=decode_path,
7110                 offset=offset,
7111             )
7112         if t != self.tag:
7113             raise TagMismatch(
7114                 klass=self.__class__,
7115                 decode_path=decode_path,
7116                 offset=offset,
7117             )
7118         if tag_only:
7119             yield None
7120             return
7121         lenindef = False
7122         ctx_bered = ctx.get("bered", False)
7123         try:
7124             l, llen, v = len_decode(lv)
7125         except LenIndefForm as err:
7126             if not ctx_bered:
7127                 raise err.__class__(
7128                     msg=err.msg,
7129                     klass=self.__class__,
7130                     decode_path=decode_path,
7131                     offset=offset,
7132                 )
7133             l, llen, v = 0, 1, lv[1:]
7134             lenindef = True
7135         except DecodeError as err:
7136             raise err.__class__(
7137                 msg=err.msg,
7138                 klass=self.__class__,
7139                 decode_path=decode_path,
7140                 offset=offset,
7141             )
7142         if l > len(v):
7143             raise NotEnoughData(
7144                 "encoded length is longer than data",
7145                 klass=self.__class__,
7146                 decode_path=decode_path,
7147                 offset=offset,
7148             )
7149         if not lenindef:
7150             v, tail = v[:l], v[l:]
7151         vlen = 0
7152         sub_offset = offset + tlen + llen
7153         _value = []
7154         _value_count = 0
7155         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7156         value_prev = memoryview(v[:0])
7157         ber_encoded = False
7158         spec = self.spec
7159         while len(v) > 0:
7160             if lenindef and v[:EOC_LEN].tobytes() == EOC:
7161                 break
7162             sub_decode_path = decode_path + (str(_value_count),)
7163             if evgen_mode:
7164                 for _decode_path, value, v_tail in spec.decode_evgen(
7165                         v,
7166                         sub_offset,
7167                         leavemm=True,
7168                         decode_path=sub_decode_path,
7169                         ctx=ctx,
7170                         _ctx_immutable=False,
7171                 ):
7172                     yield _decode_path, value, v_tail
7173             else:
7174                 _, value, v_tail = next(spec.decode_evgen(
7175                     v,
7176                     sub_offset,
7177                     leavemm=True,
7178                     decode_path=sub_decode_path,
7179                     ctx=ctx,
7180                     _ctx_immutable=False,
7181                     _evgen_mode=False,
7182                 ))
7183             value_len = value.fulllen
7184             if ordering_check:
7185                 if value_prev.tobytes() > v[:value_len].tobytes():
7186                     if ctx_bered or ctx_allow_unordered_set:
7187                         ber_encoded = True
7188                     else:
7189                         raise DecodeError(
7190                             "unordered " + self.asn1_type_name,
7191                             klass=self.__class__,
7192                             decode_path=sub_decode_path,
7193                             offset=sub_offset,
7194                         )
7195                 value_prev = v[:value_len]
7196             _value_count += 1
7197             if not evgen_mode:
7198                 _value.append(value)
7199             sub_offset += value_len
7200             vlen += value_len
7201             v = v_tail
7202         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7203             raise DecodeError(
7204                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7205                 klass=self.__class__,
7206                 decode_path=decode_path,
7207                 offset=offset,
7208             )
7209         try:
7210             obj = self.__class__(
7211                 value=None if evgen_mode else _value,
7212                 schema=spec,
7213                 bounds=(self._bound_min, self._bound_max),
7214                 impl=self.tag,
7215                 expl=self._expl,
7216                 default=self.default,
7217                 optional=self.optional,
7218                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7219             )
7220         except BoundsError as err:
7221             raise DecodeError(
7222                 msg=str(err),
7223                 klass=self.__class__,
7224                 decode_path=decode_path,
7225                 offset=offset,
7226             )
7227         if lenindef:
7228             if v[:EOC_LEN].tobytes() != EOC:
7229                 raise DecodeError(
7230                     "no EOC",
7231                     klass=self.__class__,
7232                     decode_path=decode_path,
7233                     offset=offset,
7234                 )
7235             obj.lenindef = True
7236             tail = v[EOC_LEN:]
7237         obj.ber_encoded = ber_encoded
7238         yield decode_path, obj, tail
7239
7240     def __repr__(self):
7241         return "%s[%s]" % (
7242             pp_console_row(next(self.pps())),
7243             ", ".join(repr(v) for v in self._value),
7244         )
7245
7246     def pps(self, decode_path=()):
7247         yield _pp(
7248             obj=self,
7249             asn1_type_name=self.asn1_type_name,
7250             obj_name=self.__class__.__name__,
7251             decode_path=decode_path,
7252             optional=self.optional,
7253             default=self == self.default,
7254             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7255             expl=None if self._expl is None else tag_decode(self._expl),
7256             offset=self.offset,
7257             tlen=self.tlen,
7258             llen=self.llen,
7259             vlen=self.vlen,
7260             expl_offset=self.expl_offset if self.expled else None,
7261             expl_tlen=self.expl_tlen if self.expled else None,
7262             expl_llen=self.expl_llen if self.expled else None,
7263             expl_vlen=self.expl_vlen if self.expled else None,
7264             expl_lenindef=self.expl_lenindef,
7265             lenindef=self.lenindef,
7266             ber_encoded=self.ber_encoded,
7267             bered=self.bered,
7268         )
7269         for i, value in enumerate(self._value):
7270             yield value.pps(decode_path=decode_path + (str(i),))
7271         for pp in self.pps_lenindef(decode_path):
7272             yield pp
7273
7274
7275 class SetOf(SequenceOf):
7276     """``SET OF`` sequence type
7277
7278     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7279     """
7280     __slots__ = ()
7281     tag_default = tag_encode(form=TagFormConstructed, num=17)
7282     asn1_type_name = "SET OF"
7283
7284     def _value_sanitize(self, value):
7285         value = super(SetOf, self)._value_sanitize(value)
7286         if hasattr(value, NEXT_ATTR_NAME):
7287             raise ValueError(
7288                 "SetOf does not support iterator values, as no sense in them"
7289             )
7290         return value
7291
7292     def _encode(self):
7293         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7294         return b"".join((self.tag, len_encode(len(v)), v))
7295
7296     def _encode2nd(self, writer, state_iter):
7297         write_full(writer, self.tag + len_encode(next(state_iter)))
7298         values = []
7299         for v in self._values_for_encoding():
7300             buf = BytesIO()
7301             v.encode2nd(buf.write, state_iter)
7302             values.append(buf.getvalue())
7303         values.sort()
7304         for v in values:
7305             write_full(writer, v)
7306
7307     def _encode_cer(self, writer):
7308         write_full(writer, self.tag + LENINDEF)
7309         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7310             write_full(writer, v)
7311         write_full(writer, EOC)
7312
7313     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7314         return super(SetOf, self)._decode(
7315             tlv,
7316             offset,
7317             decode_path,
7318             ctx,
7319             tag_only,
7320             evgen_mode,
7321             ordering_check=True,
7322         )
7323
7324
7325 def obj_by_path(pypath):  # pragma: no cover
7326     """Import object specified as string Python path
7327
7328     Modules must be separated from classes/functions with ``:``.
7329
7330     >>> obj_by_path("foo.bar:Baz")
7331     <class 'foo.bar.Baz'>
7332     >>> obj_by_path("foo.bar:Baz.boo")
7333     <classmethod 'foo.bar.Baz.boo'>
7334     """
7335     mod, objs = pypath.rsplit(":", 1)
7336     from importlib import import_module
7337     obj = import_module(mod)
7338     for obj_name in objs.split("."):
7339         obj = getattr(obj, obj_name)
7340     return obj
7341
7342
7343 def generic_decoder():  # pragma: no cover
7344     # All of this below is a big hack with self references
7345     choice = PrimitiveTypes()
7346     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7347     choice.specs["SetOf"] = SetOf(schema=choice)
7348     for i in six_xrange(31):
7349         choice.specs["SequenceOf%d" % i] = SequenceOf(
7350             schema=choice,
7351             expl=tag_ctxc(i),
7352         )
7353     choice.specs["Any"] = Any()
7354
7355     # Class name equals to type name, to omit it from output
7356     class SEQUENCEOF(SequenceOf):
7357         __slots__ = ()
7358         schema = choice
7359
7360     def pprint_any(
7361             obj,
7362             oid_maps=(),
7363             with_colours=False,
7364             with_decode_path=False,
7365             decode_path_only=(),
7366             decode_path=(),
7367     ):
7368         def _pprint_pps(pps):
7369             for pp in pps:
7370                 if hasattr(pp, "_fields"):
7371                     if (
7372                             decode_path_only != () and
7373                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7374                     ):
7375                         continue
7376                     if pp.asn1_type_name == Choice.asn1_type_name:
7377                         continue
7378                     pp_kwargs = pp._asdict()
7379                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7380                     pp = _pp(**pp_kwargs)
7381                     yield pp_console_row(
7382                         pp,
7383                         oid_maps=oid_maps,
7384                         with_offsets=True,
7385                         with_blob=False,
7386                         with_colours=with_colours,
7387                         with_decode_path=with_decode_path,
7388                         decode_path_len_decrease=len(decode_path_only),
7389                     )
7390                     for row in pp_console_blob(
7391                             pp,
7392                             decode_path_len_decrease=len(decode_path_only),
7393                     ):
7394                         yield row
7395                 else:
7396                     for row in _pprint_pps(pp):
7397                         yield row
7398         return "\n".join(_pprint_pps(obj.pps(decode_path)))
7399     return SEQUENCEOF(), pprint_any
7400
7401
7402 def main():  # pragma: no cover
7403     import argparse
7404     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7405     parser.add_argument(
7406         "--skip",
7407         type=int,
7408         default=0,
7409         help="Skip that number of bytes from the beginning",
7410     )
7411     parser.add_argument(
7412         "--oids",
7413         help="Python paths to dictionary with OIDs, comma separated",
7414     )
7415     parser.add_argument(
7416         "--schema",
7417         help="Python path to schema definition to use",
7418     )
7419     parser.add_argument(
7420         "--defines-by-path",
7421         help="Python path to decoder's defines_by_path",
7422     )
7423     parser.add_argument(
7424         "--nobered",
7425         action="store_true",
7426         help="Disallow BER encoding",
7427     )
7428     parser.add_argument(
7429         "--print-decode-path",
7430         action="store_true",
7431         help="Print decode paths",
7432     )
7433     parser.add_argument(
7434         "--decode-path-only",
7435         help="Print only specified decode path",
7436     )
7437     parser.add_argument(
7438         "--allow-expl-oob",
7439         action="store_true",
7440         help="Allow explicit tag out-of-bound",
7441     )
7442     parser.add_argument(
7443         "--evgen",
7444         action="store_true",
7445         help="Turn on event generation mode",
7446     )
7447     parser.add_argument(
7448         "RAWFile",
7449         type=argparse.FileType("rb"),
7450         help="Path to BER/CER/DER file you want to decode",
7451     )
7452     args = parser.parse_args()
7453     if PY2:
7454         args.RAWFile.seek(args.skip)
7455         raw = memoryview(args.RAWFile.read())
7456         args.RAWFile.close()
7457     else:
7458         raw = file_mmaped(args.RAWFile)[args.skip:]
7459     oid_maps = (
7460         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7461         if args.oids else ()
7462     )
7463     from functools import partial
7464     if args.schema:
7465         schema = obj_by_path(args.schema)
7466         pprinter = partial(pprint, big_blobs=True)
7467     else:
7468         schema, pprinter = generic_decoder()
7469     ctx = {
7470         "bered": not args.nobered,
7471         "allow_expl_oob": args.allow_expl_oob,
7472     }
7473     if args.defines_by_path is not None:
7474         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7475     from os import environ
7476     pprinter = partial(
7477         pprinter,
7478         oid_maps=oid_maps,
7479         with_colours=environ.get("NO_COLOR") is None,
7480         with_decode_path=args.print_decode_path,
7481         decode_path_only=(
7482             () if args.decode_path_only is None else
7483             tuple(args.decode_path_only.split(":"))
7484         ),
7485     )
7486     if args.evgen:
7487         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7488             print(pprinter(obj, decode_path=decode_path))
7489     else:
7490         obj, tail = schema().decode(raw, ctx=ctx)
7491         print(pprinter(obj))
7492     if tail != b"":
7493         print("\nTrailing data: %s" % hexenc(tail))
7494
7495
7496 if __name__ == "__main__":
7497     from pyderasn import *
7498     main()