]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
README update and CER/BER mentioning
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
5 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
6 #
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, version 3 of the License.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 """Python ASN.1 DER/BER codec with abstract structures
19
20 This library allows you to marshal various structures in ASN.1 DER
21 format, unmarshal them in BER/CER/DER ones.
22
23     >>> i = Integer(123)
24     >>> raw = i.encode()
25     >>> Integer().decod(raw) == i
26     True
27
28 There are primitive types, holding single values
29 (:py:class:`pyderasn.BitString`,
30 :py:class:`pyderasn.Boolean`,
31 :py:class:`pyderasn.Enumerated`,
32 :py:class:`pyderasn.GeneralizedTime`,
33 :py:class:`pyderasn.Integer`,
34 :py:class:`pyderasn.Null`,
35 :py:class:`pyderasn.ObjectIdentifier`,
36 :py:class:`pyderasn.OctetString`,
37 :py:class:`pyderasn.UTCTime`,
38 :py:class:`various strings <pyderasn.CommonString>`
39 (:py:class:`pyderasn.BMPString`,
40 :py:class:`pyderasn.GeneralString`,
41 :py:class:`pyderasn.GraphicString`,
42 :py:class:`pyderasn.IA5String`,
43 :py:class:`pyderasn.ISO646String`,
44 :py:class:`pyderasn.NumericString`,
45 :py:class:`pyderasn.PrintableString`,
46 :py:class:`pyderasn.T61String`,
47 :py:class:`pyderasn.TeletexString`,
48 :py:class:`pyderasn.UniversalString`,
49 :py:class:`pyderasn.UTF8String`,
50 :py:class:`pyderasn.VideotexString`,
51 :py:class:`pyderasn.VisibleString`)),
52 constructed types, holding multiple primitive types
53 (:py:class:`pyderasn.Sequence`,
54 :py:class:`pyderasn.SequenceOf`,
55 :py:class:`pyderasn.Set`,
56 :py:class:`pyderasn.SetOf`),
57 and special types like
58 :py:class:`pyderasn.Any` and
59 :py:class:`pyderasn.Choice`.
60
61 Common for most types
62 ---------------------
63
64 Tags
65 ____
66
67 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
68 the default tag used during coding process. You can override it with
69 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
70 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
71 argument or ``expl`` class attribute). Both arguments take raw binary
72 string, containing that tag. You can **not** set implicit and explicit
73 tags simultaneously.
74
75 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
76 functions, allowing you to easily create ``CONTEXT``
77 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
78 number.
79
80 .. note::
81
82    EXPLICIT tags always have **constructed** tag. PyDERASN does not
83    explicitly check correctness of schema input here.
84
85 .. note::
86
87    Implicit tags have **primitive** (``tag_ctxp``) encoding for
88    primitive values.
89
90 ::
91
92     >>> Integer(impl=tag_ctxp(1))
93     [1] INTEGER
94     >>> Integer(expl=tag_ctxc(2))
95     [2] EXPLICIT INTEGER
96
97 Implicit tag is not explicitly shown.
98
99 Two objects of the same type, but with different implicit/explicit tags
100 are **not** equal.
101
102 You can get object's effective tag (either default or implicited) through
103 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
104 function::
105
106     >>> tag_decode(tag_ctxc(123))
107     (128, 32, 123)
108     >>> klass, form, num = tag_decode(tag_ctxc(123))
109     >>> klass == TagClassContext
110     True
111     >>> form == TagFormConstructed
112     True
113
114 To determine if object has explicit tag, use ``expled`` boolean property
115 and ``expl_tag`` property, returning explicit tag's value.
116
117 Default/optional
118 ________________
119
120 Many objects in sequences could be ``OPTIONAL`` and could have
121 ``DEFAULT`` value. You can specify that object's property using
122 corresponding keyword arguments.
123
124     >>> Integer(optional=True, default=123)
125     INTEGER 123 OPTIONAL DEFAULT
126
127 Those specifications do not play any role in primitive value encoding,
128 but are taken into account when dealing with sequences holding them. For
129 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
130 ``version`` field::
131
132     class Version(Integer):
133         schema = (
134             ("v1", 0),
135             ("v2", 1),
136             ("v3", 2),
137         )
138     class TBSCertificate(Sequence):
139         schema = (
140             ("version", Version(expl=tag_ctxc(0), default="v1")),
141         [...]
142
143 When default argument is used and value is not specified, then it equals
144 to default one.
145
146 .. _bounds:
147
148 Size constraints
149 ________________
150
151 Some objects give ability to set value size constraints. This is either
152 possible integer value, or allowed length of various strings and
153 sequences. Constraints are set in the following way::
154
155     class X(...):
156         bounds = (MIN, MAX)
157
158 And values satisfaction is checked as: ``MIN <= X <= MAX``.
159
160 For simplicity you can also set bounds the following way::
161
162     bounded_x = X(bounds=(MIN, MAX))
163
164 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
165 raised.
166
167 Common methods
168 ______________
169
170 All objects have ``ready`` boolean property, that tells if object is
171 ready to be encoded. If that kind of action is performed on unready
172 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
173
174 All objects are friendly to ``copy.copy()`` and copied objects can be
175 safely mutated.
176
177 Also all objects can be safely ``pickle``-d, but pay attention that
178 pickling among different PyDERASN versions is prohibited.
179
180 .. _decoding:
181
182 Decoding
183 --------
184
185 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
186 ``offset`` optional argument could be used to set initial object's
187 offset in the binary data, for convenience. It returns decoded object
188 and remaining unmarshalled data (tail). Internally all work is done on
189 ``memoryview(data)``, and you can leave returning tail as a memoryview,
190 by specifying ``leavemm=True`` argument.
191
192 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
193 immediately checks and raises if there is non-empty tail.
194
195 When object is decoded, ``decoded`` property is true and you can safely
196 use following properties:
197
198 * ``offset`` -- position including initial offset where object's tag starts
199 * ``tlen`` -- length of object's tag
200 * ``llen`` -- length of object's length value
201 * ``vlen`` -- length of object's value
202 * ``tlvlen`` -- length of the whole object
203
204 Pay attention that those values do **not** include anything related to
205 explicit tag. If you want to know information about it, then use:
206
207 * ``expled`` -- to know if explicit tag is set
208 * ``expl_offset`` (it is lesser than ``offset``)
209 * ``expl_tlen``,
210 * ``expl_llen``
211 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
212 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
213   ``offset`` otherwise
214 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
215   ``tlvlen`` otherwise
216
217 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
218
219 .. _ctx:
220
221 Context
222 _______
223
224 You can specify so called context keyword argument during
225 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
226 various options governing decoding process.
227
228 Currently available context options:
229
230 * :ref:`allow_default_values <allow_default_values_ctx>`
231 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
232 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
233 * :ref:`bered <bered_ctx>`
234 * :ref:`defines_by_path <defines_by_path_ctx>`
235 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
236
237 .. _pprinting:
238
239 Pretty printing
240 ---------------
241
242 All objects have ``pps()`` method, that is a generator of
243 :py:class:`pyderasn.PP` namedtuple, holding various raw information
244 about the object. If ``pps`` is called on sequences, then all underlying
245 ``PP`` will be yielded.
246
247 You can use :py:func:`pyderasn.pp_console_row` function, converting
248 those ``PP`` to human readable string. Actually exactly it is used for
249 all object ``repr``. But it is easy to write custom formatters.
250
251     >>> from pyderasn import pprint
252     >>> encoded = Integer(-12345).encode()
253     >>> obj, tail = Integer().decode(encoded)
254     >>> print(pprint(obj))
255         0   [1,1,   2] INTEGER -12345
256
257 .. _pprint_example:
258
259 Example certificate::
260
261     >>> print(pprint(crt))
262         0   [1,3,1604] Certificate SEQUENCE
263         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
264        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
265        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
266        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
267        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
268        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
269                         . . . . 05:00
270        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
271        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
272        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
273        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
274        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
275        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
276                         . . . . . . . 13:02:45:53
277     [...]
278      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
279      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
280      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
281                         . . . 05:00
282      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
283                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
284                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
285      [...]
286
287     Trailing data: 0a
288
289 Let's parse that output, human::
290
291        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
292        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
293        0  1  2 3    4     5   6        7            8       9       10 11
294
295 ::
296
297        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
298        ^     ^ ^    ^     ^     ^          ^                 ^
299        0     2 3    4     5     6          9                 10
300
301 ::
302
303        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
304        ^     ^ ^    ^     ^   ^       ^    ^      ^
305        0     2 3    4     5   6       8    9      10
306
307 ::
308
309        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
310              ^           ^                                 ^   ^            ^
311             12          13                                14   9            10
312
313 :0:
314  Offset of the object, where its DER/BER encoding begins.
315  Pay attention that it does **not** include explicit tag.
316 :1:
317  If explicit tag exists, then this is its length (tag + encoded length).
318 :2:
319  Length of object's tag. For example CHOICE does not have its own tag,
320  so it is zero.
321 :3:
322  Length of encoded length.
323 :4:
324  Length of encoded value.
325 :5:
326  Visual indentation to show the depth of object in the hierarchy.
327 :6:
328  Object's name inside SEQUENCE/CHOICE.
329 :7:
330  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
331  here. "IMPLICIT" is omitted.
332 :8:
333  Object's class name, if set. Omitted if it is just an ordinary simple
334  value (like with ``algorithm`` in example above).
335 :9:
336  Object's ASN.1 type.
337 :10:
338  Object's value, if set. Can consist of multiple words (like OCTET/BIT
339  STRINGs above). We see ``v3`` value in Version, because it is named.
340  ``rdnSequence`` is the choice of CHOICE type.
341 :11:
342  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
343  default one, specified in the schema.
344 :12:
345  Shows does object contains any kind of BER encoded data (possibly
346  Sequence holding BER-encoded underlying value).
347 :13:
348  Only applicable to BER encoded data. Indefinite length encoding mark.
349 :14:
350  Only applicable to BER encoded data. If object has BER-specific
351  encoding, then ``BER`` will be shown. It does not depend on indefinite
352  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
353  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
354  could be BERed.
355
356 .. _definedby:
357
358 DEFINED BY
359 ----------
360
361 ASN.1 structures often have ANY and OCTET STRING fields, that are
362 DEFINED BY some previously met ObjectIdentifier. This library provides
363 ability to specify mapping between some OID and field that must be
364 decoded with specific specification.
365
366 .. _defines:
367
368 defines kwarg
369 _____________
370
371 :py:class:`pyderasn.ObjectIdentifier` field inside
372 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
373 necessary for decoding structures. For example, CMS (:rfc:`5652`)
374 container::
375
376     class ContentInfo(Sequence):
377         schema = (
378             ("contentType", ContentType(defines=((("content",), {
379                 id_digestedData: DigestedData(),
380                 id_signedData: SignedData(),
381             }),))),
382             ("content", Any(expl=tag_ctxc(0))),
383         )
384
385 ``contentType`` field tells that it defines that ``content`` must be
386 decoded with ``SignedData`` specification, if ``contentType`` equals to
387 ``id-signedData``. The same applies to ``DigestedData``. If
388 ``contentType`` contains unknown OID, then no automatic decoding is
389 done.
390
391 You can specify multiple fields, that will be autodecoded -- that is why
392 ``defines`` kwarg is a sequence. You can specify defined field
393 relatively or absolutely to current decode path. For example ``defines``
394 for AlgorithmIdentifier of X.509's
395 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
396
397         (
398             (("parameters",), {
399                 id_ecPublicKey: ECParameters(),
400                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
401             }),
402             (("..", "subjectPublicKey"), {
403                 id_rsaEncryption: RSAPublicKey(),
404                 id_GostR3410_2001: OctetString(),
405             }),
406         ),
407
408 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
409 autodecode its parameters inside SPKI's algorithm and its public key
410 itself.
411
412 Following types can be automatically decoded (DEFINED BY):
413
414 * :py:class:`pyderasn.Any`
415 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
416 * :py:class:`pyderasn.OctetString`
417 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
418   ``Any``/``BitString``/``OctetString``-s
419
420 When any of those fields is automatically decoded, then ``.defined``
421 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
422 was defined, ``value`` contains corresponding decoded value. For example
423 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
424
425 .. _defines_by_path_ctx:
426
427 defines_by_path context option
428 ______________________________
429
430 Sometimes you either can not or do not want to explicitly set *defines*
431 in the schema. You can dynamically apply those definitions when calling
432 :py:meth:`pyderasn.Obj.decode` method.
433
434 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
435 value must be sequence of following tuples::
436
437     (decode_path, defines)
438
439 where ``decode_path`` is a tuple holding so-called decode path to the
440 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
441 ``defines``, holding exactly the same value as accepted in its
442 :ref:`keyword argument <defines>`.
443
444 For example, again for CMS, you want to automatically decode
445 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
446 structures it may hold. Also, automatically decode ``controlSequence``
447 of ``PKIResponse``::
448
449     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
450         (
451             ("contentType",),
452             ((("content",), {id_signedData: SignedData()}),),
453         ),
454         (
455             (
456                 "content",
457                 DecodePathDefBy(id_signedData),
458                 "encapContentInfo",
459                 "eContentType",
460             ),
461             ((("eContent",), {
462                 id_cct_PKIData: PKIData(),
463                 id_cct_PKIResponse: PKIResponse(),
464             })),
465         ),
466         (
467             (
468                 "content",
469                 DecodePathDefBy(id_signedData),
470                 "encapContentInfo",
471                 "eContent",
472                 DecodePathDefBy(id_cct_PKIResponse),
473                 "controlSequence",
474                 any,
475                 "attrType",
476             ),
477             ((("attrValues",), {
478                 id_cmc_recipientNonce: RecipientNonce(),
479                 id_cmc_senderNonce: SenderNonce(),
480                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
481                 id_cmc_transactionId: TransactionId(),
482             })),
483         ),
484     )})
485
486 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
487 First function is useful for path construction when some automatic
488 decoding is already done. ``any`` means literally any value it meet --
489 useful for SEQUENCE/SET OF-s.
490
491 .. _bered_ctx:
492
493 BER encoding
494 ------------
495
496 By default PyDERASN accepts only DER encoded data. By default it encodes
497 to DER. But you can optionally enable BER decoding with setting
498 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
499 constructed primitive types should be parsed successfully.
500
501 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
502   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
503   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
504   ``UTCTime``, ``GeneralizedTime`` can contain it.
505 * If object has an indefinite length encoding, then its ``lenindef``
506   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
507   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
508   contain it.
509 * If object has an indefinite length encoded explicit tag, then
510   ``expl_lenindef`` is set to True.
511 * If object has either any of BER-related encoding (explicit tag
512   indefinite length, object's indefinite length, BER-encoding) or any
513   underlying component has that kind of encoding, then ``bered``
514   attribute is set to True. For example SignedData CMS can have
515   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
516   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
517
518 EOC (end-of-contents) token's length is taken in advance in object's
519 value length.
520
521 .. _allow_expl_oob_ctx:
522
523 Allow explicit tag out-of-bound
524 -------------------------------
525
526 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
527 one value, more than one object. If you set ``allow_expl_oob`` context
528 option to True, then no error will be raised and that invalid encoding
529 will be silently further processed. But pay attention that offsets and
530 lengths will be invalid in that case.
531
532 .. warning::
533
534    This option should be used only for skipping some decode errors, just
535    to see the decoded structure somehow.
536
537 .. _streaming:
538
539 Streaming and dealing with huge structures
540 ------------------------------------------
541
542 .. _evgen_mode:
543
544 evgen mode
545 __________
546
547 ASN.1 structures can be huge, they can hold millions of objects inside
548 (for example Certificate Revocation Lists (CRL), holding revocation
549 state for every previously issued X.509 certificate). CACert.org's 8 MiB
550 CRL file takes more than half a gigabyte of memory to hold the decoded
551 structure.
552
553 If you just simply want to check the signature over the ``tbsCertList``,
554 you can create specialized schema with that field represented as
555 OctetString for example::
556
557     class TBSCertListFast(Sequence):
558         schema = (
559             [...]
560             ("revokedCertificates", OctetString(
561                 impl=SequenceOf.tag_default,
562                 optional=True,
563             )),
564             [...]
565         )
566
567 This allows you to quickly decode a few fields and check the signature
568 over the ``tbsCertList`` bytes.
569
570 But how can you get all certificate's serial number from it, after you
571 trust that CRL after signature validation? You can use so called
572 ``evgen`` (event generation) mode, to catch the events/facts of some
573 successful object decoding. Let's use command line capabilities::
574
575     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
576          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
577          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
578          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
579          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
580          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
581          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
582          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
583          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
584     [...]
585         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
586         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
587         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
590         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
591         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
592         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
595     [...]
596     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
597     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
598     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
599       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
600         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
601     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
602     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
603     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
604     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
605         0     [1,4,9145534]  CertificateList SEQUENCE
606
607 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
608 decodes underlying values. It can not tell if SEQUENCE is decoded, so
609 the event of the upper level SEQUENCE is the last one we see.
610 ``version`` field is just a single INTEGER -- it is decoded and event is
611 fired immediately. Then we see that ``algorithm`` and ``parameters``
612 fields are decoded and only after them the ``signature`` SEQUENCE is
613 fired as a successfully decoded. There are 4 events for each revoked
614 certificate entry in that CRL: ``userCertificate`` serial number,
615 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
616 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
617
618 We can do that in our ordinary Python code and understand where we are
619 by looking at deterministically generated decode paths (do not forget
620 about useful ``--print-decode-path`` CLI option). We must use
621 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
622 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
623 obj, tail)`` tuples::
624
625     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
626         if (
627             len(decode_path) == 4 and
628             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
629             decode_path[3] == "userCertificate"
630         ):
631             print("serial number:", int(obj))
632
633 Virtually it does not take any memory except at least needed for single
634 object storage. You can easily use that mode to determine required
635 object ``.offset`` and ``.*len`` to be able to decode it separately, or
636 maybe verify signature upon it just by taking bytes by ``.offset`` and
637 ``.tlvlen``.
638
639 .. _evgen_mode_upto_ctx:
640
641 evgen_mode_upto
642 _______________
643
644 There is full ability to get any kind of data from the CRL in the
645 example above. However it is not too convenient to get the whole
646 ``RevokedCertificate`` structure, that is pretty lightweight and one may
647 do not want to disassemble it. You can use ``evgen_mode_upto``
648 :ref:`ctx <ctx>` option that semantically equals to
649 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
650 mapped to any non-None value. If specified decode path is met, then any
651 subsequent objects won't be decoded in evgen mode. That allows us to
652 parse the CRL above with fully assembled ``RevokedCertificate``::
653
654     for decode_path, obj, _ in CertificateList().decode_evgen(
655         crl_raw,
656         ctx={"evgen_mode_upto": (
657             (("tbsCertList", "revokedCertificates", any), True),
658         )},
659     ):
660         if (
661             len(decode_path) == 3 and
662             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
663         ):
664             print("serial number:", int(obj["userCertificate"]))
665
666 .. _mmap:
667
668 mmap-ed file
669 ____________
670
671 POSIX compliant systems have ``mmap`` syscall, giving ability to work
672 the memory mapped file. You can deal with the file like it was an
673 ordinary binary string, allowing you not to load it to the memory first.
674 Also you can use them as an input for OCTET STRING, taking no Python
675 memory for their storage.
676
677 There is convenient :py:func:`pyderasn.file_mmaped` function that
678 creates read-only memoryview on the file contents::
679
680     with open("huge", "rb") as fd:
681         raw = file_mmaped(fd)
682         obj = Something.decode(raw)
683
684 .. warning::
685
686    mmap-ed files in Python2.7 does not implement buffer protocol, so
687    memoryview won't work on them.
688
689 .. warning::
690
691    mmap maps the **whole** file. So it plays no role if you seek-ed it
692    before. Take the slice of the resulting memoryview with required
693    offset instead.
694
695 .. note::
696
697    If you use ZFS as underlying storage, then pay attention that
698    currently most platforms does not deal good with ZFS ARC and ordinary
699    page cache used for mmaps. It can take twice the necessary size in
700    the memory: both in page cache and ZFS ARC.
701
702 CER encoding
703 ____________
704
705 We can parse any kind of data now, but how can we produce files
706 streamingly, without storing their encoded representation in memory?
707 SEQUENCE by default encodes in memory all its values, joins them in huge
708 binary string, just to know the exact size of SEQUENCE's value for
709 encoding it in TLV. DER requires you to know all exact sizes of the
710 objects.
711
712 You can use CER encoding mode, that slightly differs from the DER, but
713 does not require exact sizes knowledge, allowing streaming encoding
714 directly to some writer/buffer. Just use
715 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
716 encoded data will flow::
717
718     opener = io.open if PY2 else open
719     with opener("result", "wb") as fd:
720         obj.encode_cer(fd.write)
721
722 ::
723
724     buf = io.BytesIO()
725     obj.encode_cer(buf.write)
726
727 If you do not want to create in-memory buffer every time, then you can
728 use :py:func:`pyderasn.encode_cer` function::
729
730     data = encode_cer(obj)
731
732 Remember that CER is **not valid** DER in most cases, so you **have to**
733 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
734 decoding. Also currently there is **no** validation that provided CER is
735 valid one -- you are sure that it has only valid BER encoding.
736
737 .. warning::
738
739    SET OF values can not be streamingly encoded, because they are
740    required to be sorted byte-by-byte. Big SET OF values still will take
741    much memory. Use neither SET nor SET OF values, as modern ASN.1
742    also recommends too.
743
744 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
745 OCTET STRINGs! They will be streamingly copied from underlying file to
746 the buffer using 1 KB chunks.
747
748 Some structures require that some of the elements have to be forcefully
749 DER encoded. For example ``SignedData`` CMS requires you to encode
750 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
751 encode everything else in BER. You can tell any of the structures to be
752 forcefully encoded in DER during CER encoding, by specifying
753 ``der_forced=True`` attribute::
754
755     class Certificate(Sequence):
756         schema = (...)
757         der_forced = True
758
759     class SignedAttributes(SetOf):
760         schema = Attribute()
761         bounds = (1, 32)
762         der_forced = True
763
764 agg_octet_string
765 ________________
766
767 In most cases, huge quantity of binary data is stored as OCTET STRING.
768 CER encoding splits it on 1 KB chunks. BER allows splitting on various
769 levels of chunks inclusion::
770
771     SOME STRING[CONSTRUCTED]
772         OCTET STRING[CONSTRUCTED]
773             OCTET STRING[PRIMITIVE]
774                 DATA CHUNK
775             OCTET STRING[PRIMITIVE]
776                 DATA CHUNK
777             OCTET STRING[PRIMITIVE]
778                 DATA CHUNK
779         OCTET STRING[PRIMITIVE]
780             DATA CHUNK
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786         OCTET STRING[CONSTRUCTED]
787             OCTET STRING[CONSTRUCTED]
788                 OCTET STRING[PRIMITIVE]
789                     DATA CHUNK
790
791 You can not just take the offset and some ``.vlen`` of the STRING and
792 treat it as the payload. If you decode it without
793 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
794 and ``bytes()`` will give the whole payload contents.
795
796 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
797 small memory footprint. There is convenient
798 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
799 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
800 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
801 SHA512 digest of its ``eContent``::
802
803     fd = open("data.p7m", "rb")
804     raw = file_mmaped(fd)
805     ctx = {"bered": True}
806     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
807         if decode_path == ("content",):
808             content = obj
809             break
810     else:
811         raise ValueError("no content found")
812     hasher_state = sha512()
813     def hasher(data):
814         hasher_state.update(data)
815         return len(data)
816     evgens = SignedData().decode_evgen(
817         raw[content.offset:],
818         offset=content.offset,
819         ctx=ctx,
820     )
821     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
822     fd.close()
823     digest = hasher_state.digest()
824
825 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
826 copy the payload (without BER/CER encoding interleaved overhead) in it.
827 Virtually it won't take memory more than for keeping small structures
828 and 1 KB binary chunks.
829
830 SEQUENCE OF iterators
831 _____________________
832
833 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
834 classes. The only difference with providing the full list of objects, is
835 that type and bounds checking is done during encoding process. Also
836 sequence's value will be emptied after encoding, forcing you to set its
837 value again.
838
839 This is very useful when you have to create some huge objects, like
840 CRLs, with thousands and millions of entities inside. You can write the
841 generator taking necessary data from the database and giving the
842 ``RevokedCertificate`` objects. Only binary representation of that
843 objects will take memory during DER encoding.
844
845 Base Obj
846 --------
847 .. autoclass:: pyderasn.Obj
848    :members:
849
850 Primitive types
851 ---------------
852
853 Boolean
854 _______
855 .. autoclass:: pyderasn.Boolean
856    :members: __init__
857
858 Integer
859 _______
860 .. autoclass:: pyderasn.Integer
861    :members: __init__, named
862
863 BitString
864 _________
865 .. autoclass:: pyderasn.BitString
866    :members: __init__, bit_len, named
867
868 OctetString
869 ___________
870 .. autoclass:: pyderasn.OctetString
871    :members: __init__
872
873 Null
874 ____
875 .. autoclass:: pyderasn.Null
876    :members: __init__
877
878 ObjectIdentifier
879 ________________
880 .. autoclass:: pyderasn.ObjectIdentifier
881    :members: __init__
882
883 Enumerated
884 __________
885 .. autoclass:: pyderasn.Enumerated
886
887 CommonString
888 ____________
889 .. autoclass:: pyderasn.CommonString
890
891 NumericString
892 _____________
893 .. autoclass:: pyderasn.NumericString
894
895 PrintableString
896 _______________
897 .. autoclass:: pyderasn.PrintableString
898    :members: __init__, allow_asterisk, allow_ampersand
899
900 UTCTime
901 _______
902 .. autoclass:: pyderasn.UTCTime
903    :members: __init__, todatetime
904
905 GeneralizedTime
906 _______________
907 .. autoclass:: pyderasn.GeneralizedTime
908    :members: __init__, todatetime
909
910 Special types
911 -------------
912
913 Choice
914 ______
915 .. autoclass:: pyderasn.Choice
916    :members: __init__, choice, value
917
918 PrimitiveTypes
919 ______________
920 .. autoclass:: PrimitiveTypes
921
922 Any
923 ___
924 .. autoclass:: pyderasn.Any
925    :members: __init__
926
927 Constructed types
928 -----------------
929
930 Sequence
931 ________
932 .. autoclass:: pyderasn.Sequence
933    :members: __init__
934
935 Set
936 ___
937 .. autoclass:: pyderasn.Set
938    :members: __init__
939
940 SequenceOf
941 __________
942 .. autoclass:: pyderasn.SequenceOf
943    :members: __init__
944
945 SetOf
946 _____
947 .. autoclass:: pyderasn.SetOf
948    :members: __init__
949
950 Various
951 -------
952
953 .. autofunction:: pyderasn.abs_decode_path
954 .. autofunction:: pyderasn.agg_octet_string
955 .. autofunction:: pyderasn.colonize_hex
956 .. autofunction:: pyderasn.encode_cer
957 .. autofunction:: pyderasn.file_mmaped
958 .. autofunction:: pyderasn.hexenc
959 .. autofunction:: pyderasn.hexdec
960 .. autofunction:: pyderasn.tag_encode
961 .. autofunction:: pyderasn.tag_decode
962 .. autofunction:: pyderasn.tag_ctxp
963 .. autofunction:: pyderasn.tag_ctxc
964 .. autoclass:: pyderasn.DecodeError
965    :members: __init__
966 .. autoclass:: pyderasn.NotEnoughData
967 .. autoclass:: pyderasn.ExceedingData
968 .. autoclass:: pyderasn.LenIndefForm
969 .. autoclass:: pyderasn.TagMismatch
970 .. autoclass:: pyderasn.InvalidLength
971 .. autoclass:: pyderasn.InvalidOID
972 .. autoclass:: pyderasn.ObjUnknown
973 .. autoclass:: pyderasn.ObjNotReady
974 .. autoclass:: pyderasn.InvalidValueType
975 .. autoclass:: pyderasn.BoundsError
976
977 .. _cmdline:
978
979 Command-line usage
980 ------------------
981
982 You can decode DER/BER files using command line abilities::
983
984     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
985
986 If there is no schema for your file, then you can try parsing it without,
987 but of course IMPLICIT tags will often make it impossible. But result is
988 good enough for the certificate above::
989
990     $ python -m pyderasn path/to/file
991         0   [1,3,1604]  . >: SEQUENCE OF
992         4   [1,3,1453]  . . >: SEQUENCE OF
993         8   [0,0,   5]  . . . . >: [0] ANY
994                         . . . . . A0:03:02:01:02
995        13   [1,1,   3]  . . . . >: INTEGER 61595
996        18   [1,1,  13]  . . . . >: SEQUENCE OF
997        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
998        31   [1,1,   0]  . . . . . . >: NULL
999        33   [1,3, 274]  . . . . >: SEQUENCE OF
1000        37   [1,1,  11]  . . . . . . >: SET OF
1001        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1002        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1003        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1004     [...]
1005      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1006      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1007      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1008                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1009                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1010                         . . . . . . . . . 61:2E:63:6F:6D:2F
1011      1461   [1,1,  13]  . . >: SEQUENCE OF
1012      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1013      1474   [1,1,   0]  . . . . >: NULL
1014      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1015                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1016                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1017     [...]
1018
1019 Human readable OIDs
1020 ___________________
1021
1022 If you have got dictionaries with ObjectIdentifiers, like example one
1023 from ``tests/test_crts.py``::
1024
1025     stroid2name = {
1026         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1027         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1028         [...]
1029         "2.5.4.10": "id-at-organizationName",
1030         "2.5.4.11": "id-at-organizationalUnitName",
1031     }
1032
1033 then you can pass it to pretty printer to see human readable OIDs::
1034
1035     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1036     [...]
1037        37   [1,1,  11]  . . . . . . >: SET OF
1038        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1039        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1040        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1041        50   [1,1,  18]  . . . . . . >: SET OF
1042        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1043        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1044        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1045        70   [1,1,  18]  . . . . . . >: SET OF
1046        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1047        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1048        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1049     [...]
1050
1051 Decode paths
1052 ____________
1053
1054 Each decoded element has so-called decode path: sequence of structure
1055 names it is passing during the decode process. Each element has its own
1056 unique path inside the whole ASN.1 tree. You can print it out with
1057 ``--print-decode-path`` option::
1058
1059     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1060        0    [1,3,1604]  Certificate SEQUENCE []
1061        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1062       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1063       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1064       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1065       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1066       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1067                          . . . . 05:00
1068       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1069       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1070       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1071       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1072       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1073       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1074                          . . . . . . . 13:02:45:53
1075       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1076     [...]
1077
1078 Now you can print only the specified tree, for example signature algorithm::
1079
1080     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1081       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1082       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1083       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1084                          . . 05:00
1085 """
1086
1087 from array import array
1088 from codecs import getdecoder
1089 from codecs import getencoder
1090 from collections import namedtuple
1091 from collections import OrderedDict
1092 from copy import copy
1093 from datetime import datetime
1094 from datetime import timedelta
1095 from io import BytesIO
1096 from math import ceil
1097 from mmap import mmap
1098 from mmap import PROT_READ
1099 from operator import attrgetter
1100 from string import ascii_letters
1101 from string import digits
1102 from sys import version_info
1103 from unicodedata import category as unicat
1104
1105 from six import add_metaclass
1106 from six import binary_type
1107 from six import byte2int
1108 from six import indexbytes
1109 from six import int2byte
1110 from six import integer_types
1111 from six import iterbytes
1112 from six import iteritems
1113 from six import itervalues
1114 from six import PY2
1115 from six import string_types
1116 from six import text_type
1117 from six import unichr as six_unichr
1118 from six.moves import xrange as six_xrange
1119
1120
1121 try:
1122     from termcolor import colored
1123 except ImportError:  # pragma: no cover
1124     def colored(what, *args, **kwargs):
1125         return what
1126
1127 __version__ = "7.1"
1128
1129 __all__ = (
1130     "agg_octet_string",
1131     "Any",
1132     "BitString",
1133     "BMPString",
1134     "Boolean",
1135     "BoundsError",
1136     "Choice",
1137     "DecodeError",
1138     "DecodePathDefBy",
1139     "encode_cer",
1140     "Enumerated",
1141     "ExceedingData",
1142     "file_mmaped",
1143     "GeneralizedTime",
1144     "GeneralString",
1145     "GraphicString",
1146     "hexdec",
1147     "hexenc",
1148     "IA5String",
1149     "Integer",
1150     "InvalidLength",
1151     "InvalidOID",
1152     "InvalidValueType",
1153     "ISO646String",
1154     "LenIndefForm",
1155     "NotEnoughData",
1156     "Null",
1157     "NumericString",
1158     "obj_by_path",
1159     "ObjectIdentifier",
1160     "ObjNotReady",
1161     "ObjUnknown",
1162     "OctetString",
1163     "PrimitiveTypes",
1164     "PrintableString",
1165     "Sequence",
1166     "SequenceOf",
1167     "Set",
1168     "SetOf",
1169     "T61String",
1170     "tag_ctxc",
1171     "tag_ctxp",
1172     "tag_decode",
1173     "TagClassApplication",
1174     "TagClassContext",
1175     "TagClassPrivate",
1176     "TagClassUniversal",
1177     "TagFormConstructed",
1178     "TagFormPrimitive",
1179     "TagMismatch",
1180     "TeletexString",
1181     "UniversalString",
1182     "UTCTime",
1183     "UTF8String",
1184     "VideotexString",
1185     "VisibleString",
1186 )
1187
1188 TagClassUniversal = 0
1189 TagClassApplication = 1 << 6
1190 TagClassContext = 1 << 7
1191 TagClassPrivate = 1 << 6 | 1 << 7
1192 TagFormPrimitive = 0
1193 TagFormConstructed = 1 << 5
1194 TagClassReprs = {
1195     TagClassContext: "",
1196     TagClassApplication: "APPLICATION ",
1197     TagClassPrivate: "PRIVATE ",
1198     TagClassUniversal: "UNIV ",
1199 }
1200 EOC = b"\x00\x00"
1201 EOC_LEN = len(EOC)
1202 LENINDEF = b"\x80"  # length indefinite mark
1203 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1204 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1205 SET01 = frozenset("01")
1206 DECIMALS = frozenset(digits)
1207 DECIMAL_SIGNS = ".,"
1208 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1209
1210
1211 def file_mmaped(fd):
1212     """Make mmap-ed memoryview for reading from file
1213
1214     :param fd: file object
1215     :returns: memoryview over read-only mmap-ing of the whole file
1216     """
1217     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1218
1219 def pureint(value):
1220     if not set(value) <= DECIMALS:
1221         raise ValueError("non-pure integer")
1222     return int(value)
1223
1224 def fractions2float(fractions_raw):
1225     pureint(fractions_raw)
1226     return float("0." + fractions_raw)
1227
1228
1229 def get_def_by_path(defines_by_path, sub_decode_path):
1230     """Get define by decode path
1231     """
1232     for path, define in defines_by_path:
1233         if len(path) != len(sub_decode_path):
1234             continue
1235         for p1, p2 in zip(path, sub_decode_path):
1236             if (not p1 is any) and (p1 != p2):
1237                 break
1238         else:
1239             return define
1240
1241
1242 ########################################################################
1243 # Errors
1244 ########################################################################
1245
1246 class ASN1Error(ValueError):
1247     pass
1248
1249
1250 class DecodeError(ASN1Error):
1251     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1252         """
1253         :param str msg: reason of decode failing
1254         :param klass: optional exact DecodeError inherited class (like
1255                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1256                       :py:exc:`InvalidLength`)
1257         :param decode_path: tuple of strings. It contains human
1258                             readable names of the fields through which
1259                             decoding process has passed
1260         :param int offset: binary offset where failure happened
1261         """
1262         super(DecodeError, self).__init__()
1263         self.msg = msg
1264         self.klass = klass
1265         self.decode_path = decode_path
1266         self.offset = offset
1267
1268     def __str__(self):
1269         return " ".join(
1270             c for c in (
1271                 "" if self.klass is None else self.klass.__name__,
1272                 (
1273                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1274                     if len(self.decode_path) > 0 else ""
1275                 ),
1276                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1277                 self.msg,
1278             ) if c != ""
1279         )
1280
1281     def __repr__(self):
1282         return "%s(%s)" % (self.__class__.__name__, self)
1283
1284
1285 class NotEnoughData(DecodeError):
1286     pass
1287
1288
1289 class ExceedingData(ASN1Error):
1290     def __init__(self, nbytes):
1291         super(ExceedingData, self).__init__()
1292         self.nbytes = nbytes
1293
1294     def __str__(self):
1295         return "%d trailing bytes" % self.nbytes
1296
1297     def __repr__(self):
1298         return "%s(%s)" % (self.__class__.__name__, self)
1299
1300
1301 class LenIndefForm(DecodeError):
1302     pass
1303
1304
1305 class TagMismatch(DecodeError):
1306     pass
1307
1308
1309 class InvalidLength(DecodeError):
1310     pass
1311
1312
1313 class InvalidOID(DecodeError):
1314     pass
1315
1316
1317 class ObjUnknown(ASN1Error):
1318     def __init__(self, name):
1319         super(ObjUnknown, self).__init__()
1320         self.name = name
1321
1322     def __str__(self):
1323         return "object is unknown: %s" % self.name
1324
1325     def __repr__(self):
1326         return "%s(%s)" % (self.__class__.__name__, self)
1327
1328
1329 class ObjNotReady(ASN1Error):
1330     def __init__(self, name):
1331         super(ObjNotReady, self).__init__()
1332         self.name = name
1333
1334     def __str__(self):
1335         return "object is not ready: %s" % self.name
1336
1337     def __repr__(self):
1338         return "%s(%s)" % (self.__class__.__name__, self)
1339
1340
1341 class InvalidValueType(ASN1Error):
1342     def __init__(self, expected_types):
1343         super(InvalidValueType, self).__init__()
1344         self.expected_types = expected_types
1345
1346     def __str__(self):
1347         return "invalid value type, expected: %s" % ", ".join(
1348             [repr(t) for t in self.expected_types]
1349         )
1350
1351     def __repr__(self):
1352         return "%s(%s)" % (self.__class__.__name__, self)
1353
1354
1355 class BoundsError(ASN1Error):
1356     def __init__(self, bound_min, value, bound_max):
1357         super(BoundsError, self).__init__()
1358         self.bound_min = bound_min
1359         self.value = value
1360         self.bound_max = bound_max
1361
1362     def __str__(self):
1363         return "unsatisfied bounds: %s <= %s <= %s" % (
1364             self.bound_min,
1365             self.value,
1366             self.bound_max,
1367         )
1368
1369     def __repr__(self):
1370         return "%s(%s)" % (self.__class__.__name__, self)
1371
1372
1373 ########################################################################
1374 # Basic coders
1375 ########################################################################
1376
1377 _hexdecoder = getdecoder("hex")
1378 _hexencoder = getencoder("hex")
1379
1380
1381 def hexdec(data):
1382     """Binary data to hexadecimal string convert
1383     """
1384     return _hexdecoder(data)[0]
1385
1386
1387 def hexenc(data):
1388     """Hexadecimal string to binary data convert
1389     """
1390     return _hexencoder(data)[0].decode("ascii")
1391
1392
1393 def int_bytes_len(num, byte_len=8):
1394     if num == 0:
1395         return 1
1396     return int(ceil(float(num.bit_length()) / byte_len))
1397
1398
1399 def zero_ended_encode(num):
1400     octets = bytearray(int_bytes_len(num, 7))
1401     i = len(octets) - 1
1402     octets[i] = num & 0x7F
1403     num >>= 7
1404     i -= 1
1405     while num > 0:
1406         octets[i] = 0x80 | (num & 0x7F)
1407         num >>= 7
1408         i -= 1
1409     return bytes(octets)
1410
1411
1412 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1413     """Encode tag to binary form
1414
1415     :param int num: tag's number
1416     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1417                       :py:data:`pyderasn.TagClassContext`,
1418                       :py:data:`pyderasn.TagClassApplication`,
1419                       :py:data:`pyderasn.TagClassPrivate`)
1420     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1421                      :py:data:`pyderasn.TagFormConstructed`)
1422     """
1423     if num < 31:
1424         # [XX|X|.....]
1425         return int2byte(klass | form | num)
1426     # [XX|X|11111][1.......][1.......] ... [0.......]
1427     return int2byte(klass | form | 31) + zero_ended_encode(num)
1428
1429
1430 def tag_decode(tag):
1431     """Decode tag from binary form
1432
1433     .. warning::
1434
1435        No validation is performed, assuming that it has already passed.
1436
1437     It returns tuple with three integers, as
1438     :py:func:`pyderasn.tag_encode` accepts.
1439     """
1440     first_octet = byte2int(tag)
1441     klass = first_octet & 0xC0
1442     form = first_octet & 0x20
1443     if first_octet & 0x1F < 0x1F:
1444         return (klass, form, first_octet & 0x1F)
1445     num = 0
1446     for octet in iterbytes(tag[1:]):
1447         num <<= 7
1448         num |= octet & 0x7F
1449     return (klass, form, num)
1450
1451
1452 def tag_ctxp(num):
1453     """Create CONTEXT PRIMITIVE tag
1454     """
1455     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1456
1457
1458 def tag_ctxc(num):
1459     """Create CONTEXT CONSTRUCTED tag
1460     """
1461     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1462
1463
1464 def tag_strip(data):
1465     """Take off tag from the data
1466
1467     :returns: (encoded tag, tag length, remaining data)
1468     """
1469     if len(data) == 0:
1470         raise NotEnoughData("no data at all")
1471     if byte2int(data) & 0x1F < 31:
1472         return data[:1], 1, data[1:]
1473     i = 0
1474     while True:
1475         i += 1
1476         if i == len(data):
1477             raise DecodeError("unfinished tag")
1478         if indexbytes(data, i) & 0x80 == 0:
1479             break
1480     i += 1
1481     return data[:i], i, data[i:]
1482
1483
1484 def len_encode(l):
1485     if l < 0x80:
1486         return int2byte(l)
1487     octets = bytearray(int_bytes_len(l) + 1)
1488     octets[0] = 0x80 | (len(octets) - 1)
1489     for i in six_xrange(len(octets) - 1, 0, -1):
1490         octets[i] = l & 0xFF
1491         l >>= 8
1492     return bytes(octets)
1493
1494
1495 def len_decode(data):
1496     """Decode length
1497
1498     :returns: (decoded length, length's length, remaining data)
1499     :raises LenIndefForm: if indefinite form encoding is met
1500     """
1501     if len(data) == 0:
1502         raise NotEnoughData("no data at all")
1503     first_octet = byte2int(data)
1504     if first_octet & 0x80 == 0:
1505         return first_octet, 1, data[1:]
1506     octets_num = first_octet & 0x7F
1507     if octets_num + 1 > len(data):
1508         raise NotEnoughData("encoded length is longer than data")
1509     if octets_num == 0:
1510         raise LenIndefForm()
1511     if byte2int(data[1:]) == 0:
1512         raise DecodeError("leading zeros")
1513     l = 0
1514     for v in iterbytes(data[1:1 + octets_num]):
1515         l = (l << 8) | v
1516     if l <= 127:
1517         raise DecodeError("long form instead of short one")
1518     return l, 1 + octets_num, data[1 + octets_num:]
1519
1520
1521 LEN1K = len_encode(1000)
1522
1523
1524 def write_full(writer, data):
1525     """Fully write provided data
1526
1527     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1528
1529     BytesIO does not guarantee that the whole data will be written at
1530     once. That function write everything provided, raising an error if
1531     ``writer`` returns None.
1532     """
1533     data = memoryview(data)
1534     written = 0
1535     while written != len(data):
1536         n = writer(data[written:])
1537         if n is None:
1538             raise ValueError("can not write to buf")
1539         written += n
1540
1541
1542 ########################################################################
1543 # Base class
1544 ########################################################################
1545
1546 class AutoAddSlots(type):
1547     def __new__(cls, name, bases, _dict):
1548         _dict["__slots__"] = _dict.get("__slots__", ())
1549         return type.__new__(cls, name, bases, _dict)
1550
1551
1552 BasicState = namedtuple("BasicState", (
1553     "version",
1554     "tag",
1555     "tag_order",
1556     "expl",
1557     "default",
1558     "optional",
1559     "offset",
1560     "llen",
1561     "vlen",
1562     "expl_lenindef",
1563     "lenindef",
1564     "ber_encoded",
1565 ), **NAMEDTUPLE_KWARGS)
1566
1567
1568 @add_metaclass(AutoAddSlots)
1569 class Obj(object):
1570     """Common ASN.1 object class
1571
1572     All ASN.1 types are inherited from it. It has metaclass that
1573     automatically adds ``__slots__`` to all inherited classes.
1574     """
1575     __slots__ = (
1576         "tag",
1577         "_tag_order",
1578         "_value",
1579         "_expl",
1580         "default",
1581         "optional",
1582         "offset",
1583         "llen",
1584         "vlen",
1585         "expl_lenindef",
1586         "lenindef",
1587         "ber_encoded",
1588     )
1589
1590     def __init__(
1591             self,
1592             impl=None,
1593             expl=None,
1594             default=None,
1595             optional=False,
1596             _decoded=(0, 0, 0),
1597     ):
1598         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1599         self._expl = getattr(self, "expl", None) if expl is None else expl
1600         if self.tag != self.tag_default and self._expl is not None:
1601             raise ValueError("implicit and explicit tags can not be set simultaneously")
1602         if self.tag is None:
1603             self._tag_order = None
1604         else:
1605             tag_class, _, tag_num = tag_decode(
1606                 self.tag if self._expl is None else self._expl
1607             )
1608             self._tag_order = (tag_class, tag_num)
1609         if default is not None:
1610             optional = True
1611         self.optional = optional
1612         self.offset, self.llen, self.vlen = _decoded
1613         self.default = None
1614         self.expl_lenindef = False
1615         self.lenindef = False
1616         self.ber_encoded = False
1617
1618     @property
1619     def ready(self):  # pragma: no cover
1620         """Is object ready to be encoded?
1621         """
1622         raise NotImplementedError()
1623
1624     def _assert_ready(self):
1625         if not self.ready:
1626             raise ObjNotReady(self.__class__.__name__)
1627
1628     @property
1629     def bered(self):
1630         """Is either object or any elements inside is BER encoded?
1631         """
1632         return self.expl_lenindef or self.lenindef or self.ber_encoded
1633
1634     @property
1635     def decoded(self):
1636         """Is object decoded?
1637         """
1638         return (self.llen + self.vlen) > 0
1639
1640     def __getstate__(self):  # pragma: no cover
1641         """Used for making safe to be mutable pickleable copies
1642         """
1643         raise NotImplementedError()
1644
1645     def __setstate__(self, state):
1646         if state.version != __version__:
1647             raise ValueError("data is pickled by different PyDERASN version")
1648         self.tag = state.tag
1649         self._tag_order = state.tag_order
1650         self._expl = state.expl
1651         self.default = state.default
1652         self.optional = state.optional
1653         self.offset = state.offset
1654         self.llen = state.llen
1655         self.vlen = state.vlen
1656         self.expl_lenindef = state.expl_lenindef
1657         self.lenindef = state.lenindef
1658         self.ber_encoded = state.ber_encoded
1659
1660     @property
1661     def tag_order(self):
1662         """Tag's (class, number) used for DER/CER sorting
1663         """
1664         return self._tag_order
1665
1666     @property
1667     def tag_order_cer(self):
1668         return self.tag_order
1669
1670     @property
1671     def tlen(self):
1672         """See :ref:`decoding`
1673         """
1674         return len(self.tag)
1675
1676     @property
1677     def tlvlen(self):
1678         """See :ref:`decoding`
1679         """
1680         return self.tlen + self.llen + self.vlen
1681
1682     def __str__(self):  # pragma: no cover
1683         return self.__bytes__() if PY2 else self.__unicode__()
1684
1685     def __ne__(self, their):
1686         return not(self == their)
1687
1688     def __gt__(self, their):  # pragma: no cover
1689         return not(self < their)
1690
1691     def __le__(self, their):  # pragma: no cover
1692         return (self == their) or (self < their)
1693
1694     def __ge__(self, their):  # pragma: no cover
1695         return (self == their) or (self > their)
1696
1697     def _encode(self):  # pragma: no cover
1698         raise NotImplementedError()
1699
1700     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1701         yield NotImplemented
1702
1703     def encode(self):
1704         """DER encode the structure
1705
1706         :returns: DER representation
1707         """
1708         raw = self._encode()
1709         if self._expl is None:
1710             return raw
1711         return b"".join((self._expl, len_encode(len(raw)), raw))
1712
1713     def encode_cer(self, writer):
1714         """CER encode the structure to specified writer
1715
1716         :param writer: must comply with ``io.RawIOBase.write``
1717                        behaviour. It takes slice to be written and
1718                        returns number of bytes processed. If it returns
1719                        None, then exception will be raised
1720         """
1721         if self._expl is not None:
1722             write_full(writer, self._expl + LENINDEF)
1723         if getattr(self, "der_forced", False):
1724             write_full(writer, self._encode())
1725         else:
1726             self._encode_cer(writer)
1727         if self._expl is not None:
1728             write_full(writer, EOC)
1729
1730     def _encode_cer(self, writer):
1731         write_full(writer, self._encode())
1732
1733     def hexencode(self):
1734         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1735         """
1736         return hexenc(self.encode())
1737
1738     def decode(
1739             self,
1740             data,
1741             offset=0,
1742             leavemm=False,
1743             decode_path=(),
1744             ctx=None,
1745             tag_only=False,
1746             _ctx_immutable=True,
1747     ):
1748         """Decode the data
1749
1750         :param data: either binary or memoryview
1751         :param int offset: initial data's offset
1752         :param bool leavemm: do we need to leave memoryview of remaining
1753                     data as is, or convert it to bytes otherwise
1754         :param decode_path: current decode path (tuples of strings,
1755                             possibly with DecodePathDefBy) with will be
1756                             the root for all underlying objects
1757         :param ctx: optional :ref:`context <ctx>` governing decoding process
1758         :param bool tag_only: decode only the tag, without length and
1759                               contents (used only in Choice and Set
1760                               structures, trying to determine if tag satisfies
1761                               the schema)
1762         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1763                                     before using it?
1764         :returns: (Obj, remaining data)
1765
1766         .. seealso:: :ref:`decoding`
1767         """
1768         result = next(self.decode_evgen(
1769             data,
1770             offset,
1771             leavemm,
1772             decode_path,
1773             ctx,
1774             tag_only,
1775             _ctx_immutable,
1776             _evgen_mode=False,
1777         ))
1778         if result is None:
1779             return None
1780         _, obj, tail = result
1781         return obj, tail
1782
1783     def decode_evgen(
1784             self,
1785             data,
1786             offset=0,
1787             leavemm=False,
1788             decode_path=(),
1789             ctx=None,
1790             tag_only=False,
1791             _ctx_immutable=True,
1792             _evgen_mode=True,
1793     ):
1794         """Decode with evgen mode on
1795
1796         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1797         it returns the generator producing ``(decode_path, obj, tail)``
1798         values. See :ref:`evgen mode <evgen_mode>`.
1799         """
1800         if ctx is None:
1801             ctx = {}
1802         elif _ctx_immutable:
1803             ctx = copy(ctx)
1804         tlv = memoryview(data)
1805         if (
1806                 _evgen_mode and
1807                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1808         ):
1809             _evgen_mode = False
1810         if self._expl is None:
1811             for result in self._decode(
1812                     tlv,
1813                     offset=offset,
1814                     decode_path=decode_path,
1815                     ctx=ctx,
1816                     tag_only=tag_only,
1817                     evgen_mode=_evgen_mode,
1818             ):
1819                 if tag_only:
1820                     yield None
1821                     return
1822                 _decode_path, obj, tail = result
1823                 if not _decode_path is decode_path:
1824                     yield result
1825         else:
1826             try:
1827                 t, tlen, lv = tag_strip(tlv)
1828             except DecodeError as err:
1829                 raise err.__class__(
1830                     msg=err.msg,
1831                     klass=self.__class__,
1832                     decode_path=decode_path,
1833                     offset=offset,
1834                 )
1835             if t != self._expl:
1836                 raise TagMismatch(
1837                     klass=self.__class__,
1838                     decode_path=decode_path,
1839                     offset=offset,
1840                 )
1841             try:
1842                 l, llen, v = len_decode(lv)
1843             except LenIndefForm as err:
1844                 if not ctx.get("bered", False):
1845                     raise err.__class__(
1846                         msg=err.msg,
1847                         klass=self.__class__,
1848                         decode_path=decode_path,
1849                         offset=offset,
1850                     )
1851                 llen, v = 1, lv[1:]
1852                 offset += tlen + llen
1853                 for result in self._decode(
1854                         v,
1855                         offset=offset,
1856                         decode_path=decode_path,
1857                         ctx=ctx,
1858                         tag_only=tag_only,
1859                         evgen_mode=_evgen_mode,
1860                 ):
1861                     if tag_only:  # pragma: no cover
1862                         yield None
1863                         return
1864                     _decode_path, obj, tail = result
1865                     if not _decode_path is decode_path:
1866                         yield result
1867                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1868                 if eoc_expected.tobytes() != EOC:
1869                     raise DecodeError(
1870                         "no EOC",
1871                         klass=self.__class__,
1872                         decode_path=decode_path,
1873                         offset=offset,
1874                     )
1875                 obj.vlen += EOC_LEN
1876                 obj.expl_lenindef = True
1877             except DecodeError as err:
1878                 raise err.__class__(
1879                     msg=err.msg,
1880                     klass=self.__class__,
1881                     decode_path=decode_path,
1882                     offset=offset,
1883                 )
1884             else:
1885                 if l > len(v):
1886                     raise NotEnoughData(
1887                         "encoded length is longer than data",
1888                         klass=self.__class__,
1889                         decode_path=decode_path,
1890                         offset=offset,
1891                     )
1892                 for result in self._decode(
1893                         v,
1894                         offset=offset + tlen + llen,
1895                         decode_path=decode_path,
1896                         ctx=ctx,
1897                         tag_only=tag_only,
1898                         evgen_mode=_evgen_mode,
1899                 ):
1900                     if tag_only:  # pragma: no cover
1901                         yield None
1902                         return
1903                     _decode_path, obj, tail = result
1904                     if not _decode_path is decode_path:
1905                         yield result
1906                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
1907                     raise DecodeError(
1908                         "explicit tag out-of-bound, longer than data",
1909                         klass=self.__class__,
1910                         decode_path=decode_path,
1911                         offset=offset,
1912                     )
1913         yield decode_path, obj, (tail if leavemm else tail.tobytes())
1914
1915     def decod(self, data, offset=0, decode_path=(), ctx=None):
1916         """Decode the data, check that tail is empty
1917
1918         :raises ExceedingData: if tail is not empty
1919
1920         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
1921         (decode without tail) that also checks that there is no
1922         trailing data left.
1923         """
1924         obj, tail = self.decode(
1925             data,
1926             offset=offset,
1927             decode_path=decode_path,
1928             ctx=ctx,
1929             leavemm=True,
1930         )
1931         if len(tail) > 0:
1932             raise ExceedingData(len(tail))
1933         return obj
1934
1935     def hexdecode(self, data, *args, **kwargs):
1936         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
1937         """
1938         return self.decode(hexdec(data), *args, **kwargs)
1939
1940     def hexdecod(self, data, *args, **kwargs):
1941         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
1942         """
1943         return self.decod(hexdec(data), *args, **kwargs)
1944
1945     @property
1946     def expled(self):
1947         """See :ref:`decoding`
1948         """
1949         return self._expl is not None
1950
1951     @property
1952     def expl_tag(self):
1953         """See :ref:`decoding`
1954         """
1955         return self._expl
1956
1957     @property
1958     def expl_tlen(self):
1959         """See :ref:`decoding`
1960         """
1961         return len(self._expl)
1962
1963     @property
1964     def expl_llen(self):
1965         """See :ref:`decoding`
1966         """
1967         if self.expl_lenindef:
1968             return 1
1969         return len(len_encode(self.tlvlen))
1970
1971     @property
1972     def expl_offset(self):
1973         """See :ref:`decoding`
1974         """
1975         return self.offset - self.expl_tlen - self.expl_llen
1976
1977     @property
1978     def expl_vlen(self):
1979         """See :ref:`decoding`
1980         """
1981         return self.tlvlen
1982
1983     @property
1984     def expl_tlvlen(self):
1985         """See :ref:`decoding`
1986         """
1987         return self.expl_tlen + self.expl_llen + self.expl_vlen
1988
1989     @property
1990     def fulloffset(self):
1991         """See :ref:`decoding`
1992         """
1993         return self.expl_offset if self.expled else self.offset
1994
1995     @property
1996     def fulllen(self):
1997         """See :ref:`decoding`
1998         """
1999         return self.expl_tlvlen if self.expled else self.tlvlen
2000
2001     def pps_lenindef(self, decode_path):
2002         if self.lenindef and not (
2003                 getattr(self, "defined", None) is not None and
2004                 self.defined[1].lenindef
2005         ):
2006             yield _pp(
2007                 asn1_type_name="EOC",
2008                 obj_name="",
2009                 decode_path=decode_path,
2010                 offset=(
2011                     self.offset + self.tlvlen -
2012                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2013                 ),
2014                 tlen=1,
2015                 llen=1,
2016                 vlen=0,
2017                 ber_encoded=True,
2018                 bered=True,
2019             )
2020         if self.expl_lenindef:
2021             yield _pp(
2022                 asn1_type_name="EOC",
2023                 obj_name="EXPLICIT",
2024                 decode_path=decode_path,
2025                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2026                 tlen=1,
2027                 llen=1,
2028                 vlen=0,
2029                 ber_encoded=True,
2030                 bered=True,
2031             )
2032
2033
2034 def encode_cer(obj):
2035     """Encode to CER in memory buffer
2036
2037     :returns bytes: memory buffer contents
2038     """
2039     buf = BytesIO()
2040     obj.encode_cer(buf.write)
2041     return buf.getvalue()
2042
2043
2044 class DecodePathDefBy(object):
2045     """DEFINED BY representation inside decode path
2046     """
2047     __slots__ = ("defined_by",)
2048
2049     def __init__(self, defined_by):
2050         self.defined_by = defined_by
2051
2052     def __ne__(self, their):
2053         return not(self == their)
2054
2055     def __eq__(self, their):
2056         if not isinstance(their, self.__class__):
2057             return False
2058         return self.defined_by == their.defined_by
2059
2060     def __str__(self):
2061         return "DEFINED BY " + str(self.defined_by)
2062
2063     def __repr__(self):
2064         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2065
2066
2067 ########################################################################
2068 # Pretty printing
2069 ########################################################################
2070
2071 PP = namedtuple("PP", (
2072     "obj",
2073     "asn1_type_name",
2074     "obj_name",
2075     "decode_path",
2076     "value",
2077     "blob",
2078     "optional",
2079     "default",
2080     "impl",
2081     "expl",
2082     "offset",
2083     "tlen",
2084     "llen",
2085     "vlen",
2086     "expl_offset",
2087     "expl_tlen",
2088     "expl_llen",
2089     "expl_vlen",
2090     "expl_lenindef",
2091     "lenindef",
2092     "ber_encoded",
2093     "bered",
2094 ), **NAMEDTUPLE_KWARGS)
2095
2096
2097 def _pp(
2098         obj=None,
2099         asn1_type_name="unknown",
2100         obj_name="unknown",
2101         decode_path=(),
2102         value=None,
2103         blob=None,
2104         optional=False,
2105         default=False,
2106         impl=None,
2107         expl=None,
2108         offset=0,
2109         tlen=0,
2110         llen=0,
2111         vlen=0,
2112         expl_offset=None,
2113         expl_tlen=None,
2114         expl_llen=None,
2115         expl_vlen=None,
2116         expl_lenindef=False,
2117         lenindef=False,
2118         ber_encoded=False,
2119         bered=False,
2120 ):
2121     return PP(
2122         obj,
2123         asn1_type_name,
2124         obj_name,
2125         decode_path,
2126         value,
2127         blob,
2128         optional,
2129         default,
2130         impl,
2131         expl,
2132         offset,
2133         tlen,
2134         llen,
2135         vlen,
2136         expl_offset,
2137         expl_tlen,
2138         expl_llen,
2139         expl_vlen,
2140         expl_lenindef,
2141         lenindef,
2142         ber_encoded,
2143         bered,
2144     )
2145
2146
2147 def _colourize(what, colour, with_colours, attrs=("bold",)):
2148     return colored(what, colour, attrs=attrs) if with_colours else what
2149
2150
2151 def colonize_hex(hexed):
2152     """Separate hexadecimal string with colons
2153     """
2154     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2155
2156
2157 def pp_console_row(
2158         pp,
2159         oid_maps=(),
2160         with_offsets=False,
2161         with_blob=True,
2162         with_colours=False,
2163         with_decode_path=False,
2164         decode_path_len_decrease=0,
2165 ):
2166     cols = []
2167     if with_offsets:
2168         col = "%5d%s%s" % (
2169             pp.offset,
2170             (
2171                 "  " if pp.expl_offset is None else
2172                 ("-%d" % (pp.offset - pp.expl_offset))
2173             ),
2174             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2175         )
2176         col = _colourize(col, "red", with_colours, ())
2177         col += _colourize("B", "red", with_colours) if pp.bered else " "
2178         cols.append(col)
2179         col = "[%d,%d,%4d]%s" % (
2180             pp.tlen,
2181             pp.llen,
2182             pp.vlen,
2183             LENINDEF_PP_CHAR if pp.lenindef else " "
2184         )
2185         col = _colourize(col, "green", with_colours, ())
2186         cols.append(col)
2187     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2188     if decode_path_len > 0:
2189         cols.append(" ." * decode_path_len)
2190         ent = pp.decode_path[-1]
2191         if isinstance(ent, DecodePathDefBy):
2192             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2193             value = str(ent.defined_by)
2194             oid_name = None
2195             if (
2196                     len(oid_maps) > 0 and
2197                     ent.defined_by.asn1_type_name ==
2198                     ObjectIdentifier.asn1_type_name
2199             ):
2200                 for oid_map in oid_maps:
2201                     oid_name = oid_map.get(value)
2202                     if oid_name is not None:
2203                         cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2204                         break
2205             if oid_name is None:
2206                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2207         else:
2208             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2209     if pp.expl is not None:
2210         klass, _, num = pp.expl
2211         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2212         cols.append(_colourize(col, "blue", with_colours))
2213     if pp.impl is not None:
2214         klass, _, num = pp.impl
2215         col = "[%s%d]" % (TagClassReprs[klass], num)
2216         cols.append(_colourize(col, "blue", with_colours))
2217     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2218         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2219     if pp.ber_encoded:
2220         cols.append(_colourize("BER", "red", with_colours))
2221     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2222     if pp.value is not None:
2223         value = pp.value
2224         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2225         if (
2226                 len(oid_maps) > 0 and
2227                 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2228         ):
2229             for oid_map in oid_maps:
2230                 oid_name = oid_map.get(value)
2231                 if oid_name is not None:
2232                     cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2233                     break
2234         if pp.asn1_type_name == Integer.asn1_type_name:
2235             hex_repr = hex(int(pp.obj._value))[2:].upper()
2236             if len(hex_repr) % 2 != 0:
2237                 hex_repr = "0" + hex_repr
2238             cols.append(_colourize(
2239                 "(%s)" % colonize_hex(hex_repr),
2240                 "green",
2241                 with_colours,
2242             ))
2243     if with_blob:
2244         if pp.blob.__class__ == binary_type:
2245             cols.append(hexenc(pp.blob))
2246         elif pp.blob.__class__ == tuple:
2247             cols.append(", ".join(pp.blob))
2248     if pp.optional:
2249         cols.append(_colourize("OPTIONAL", "red", with_colours))
2250     if pp.default:
2251         cols.append(_colourize("DEFAULT", "red", with_colours))
2252     if with_decode_path:
2253         cols.append(_colourize(
2254             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2255             "grey",
2256             with_colours,
2257         ))
2258     return " ".join(cols)
2259
2260
2261 def pp_console_blob(pp, decode_path_len_decrease=0):
2262     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2263     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2264     if decode_path_len > 0:
2265         cols.append(" ." * (decode_path_len + 1))
2266     if pp.blob.__class__ == binary_type:
2267         blob = hexenc(pp.blob).upper()
2268         for i in six_xrange(0, len(blob), 32):
2269             chunk = blob[i:i + 32]
2270             yield " ".join(cols + [colonize_hex(chunk)])
2271     elif pp.blob.__class__ == tuple:
2272         yield " ".join(cols + [", ".join(pp.blob)])
2273
2274
2275 def pprint(
2276         obj,
2277         oid_maps=(),
2278         big_blobs=False,
2279         with_colours=False,
2280         with_decode_path=False,
2281         decode_path_only=(),
2282         decode_path=(),
2283 ):
2284     """Pretty print object
2285
2286     :param Obj obj: object you want to pretty print
2287     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2288                      Its human readable form is printed when OID is met
2289     :param big_blobs: if large binary objects are met (like OctetString
2290                       values), do we need to print them too, on separate
2291                       lines
2292     :param with_colours: colourize output, if ``termcolor`` library
2293                          is available
2294     :param with_decode_path: print decode path
2295     :param decode_path_only: print only that specified decode path
2296     """
2297     def _pprint_pps(pps):
2298         for pp in pps:
2299             if hasattr(pp, "_fields"):
2300                 if (
2301                         decode_path_only != () and
2302                         tuple(
2303                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2304                         ) != decode_path_only
2305                 ):
2306                     continue
2307                 if big_blobs:
2308                     yield pp_console_row(
2309                         pp,
2310                         oid_maps=oid_maps,
2311                         with_offsets=True,
2312                         with_blob=False,
2313                         with_colours=with_colours,
2314                         with_decode_path=with_decode_path,
2315                         decode_path_len_decrease=len(decode_path_only),
2316                     )
2317                     for row in pp_console_blob(
2318                             pp,
2319                             decode_path_len_decrease=len(decode_path_only),
2320                     ):
2321                         yield row
2322                 else:
2323                     yield pp_console_row(
2324                         pp,
2325                         oid_maps=oid_maps,
2326                         with_offsets=True,
2327                         with_blob=True,
2328                         with_colours=with_colours,
2329                         with_decode_path=with_decode_path,
2330                         decode_path_len_decrease=len(decode_path_only),
2331                     )
2332             else:
2333                 for row in _pprint_pps(pp):
2334                     yield row
2335     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2336
2337
2338 ########################################################################
2339 # ASN.1 primitive types
2340 ########################################################################
2341
2342 BooleanState = namedtuple(
2343     "BooleanState",
2344     BasicState._fields + ("value",),
2345     **NAMEDTUPLE_KWARGS
2346 )
2347
2348
2349 class Boolean(Obj):
2350     """``BOOLEAN`` boolean type
2351
2352     >>> b = Boolean(True)
2353     BOOLEAN True
2354     >>> b == Boolean(True)
2355     True
2356     >>> bool(b)
2357     True
2358     """
2359     __slots__ = ()
2360     tag_default = tag_encode(1)
2361     asn1_type_name = "BOOLEAN"
2362
2363     def __init__(
2364             self,
2365             value=None,
2366             impl=None,
2367             expl=None,
2368             default=None,
2369             optional=False,
2370             _decoded=(0, 0, 0),
2371     ):
2372         """
2373         :param value: set the value. Either boolean type, or
2374                       :py:class:`pyderasn.Boolean` object
2375         :param bytes impl: override default tag with ``IMPLICIT`` one
2376         :param bytes expl: override default tag with ``EXPLICIT`` one
2377         :param default: set default value. Type same as in ``value``
2378         :param bool optional: is object ``OPTIONAL`` in sequence
2379         """
2380         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2381         self._value = None if value is None else self._value_sanitize(value)
2382         if default is not None:
2383             default = self._value_sanitize(default)
2384             self.default = self.__class__(
2385                 value=default,
2386                 impl=self.tag,
2387                 expl=self._expl,
2388             )
2389             if value is None:
2390                 self._value = default
2391
2392     def _value_sanitize(self, value):
2393         if value.__class__ == bool:
2394             return value
2395         if issubclass(value.__class__, Boolean):
2396             return value._value
2397         raise InvalidValueType((self.__class__, bool))
2398
2399     @property
2400     def ready(self):
2401         return self._value is not None
2402
2403     def __getstate__(self):
2404         return BooleanState(
2405             __version__,
2406             self.tag,
2407             self._tag_order,
2408             self._expl,
2409             self.default,
2410             self.optional,
2411             self.offset,
2412             self.llen,
2413             self.vlen,
2414             self.expl_lenindef,
2415             self.lenindef,
2416             self.ber_encoded,
2417             self._value,
2418         )
2419
2420     def __setstate__(self, state):
2421         super(Boolean, self).__setstate__(state)
2422         self._value = state.value
2423
2424     def __nonzero__(self):
2425         self._assert_ready()
2426         return self._value
2427
2428     def __bool__(self):
2429         self._assert_ready()
2430         return self._value
2431
2432     def __eq__(self, their):
2433         if their.__class__ == bool:
2434             return self._value == their
2435         if not issubclass(their.__class__, Boolean):
2436             return False
2437         return (
2438             self._value == their._value and
2439             self.tag == their.tag and
2440             self._expl == their._expl
2441         )
2442
2443     def __call__(
2444             self,
2445             value=None,
2446             impl=None,
2447             expl=None,
2448             default=None,
2449             optional=None,
2450     ):
2451         return self.__class__(
2452             value=value,
2453             impl=self.tag if impl is None else impl,
2454             expl=self._expl if expl is None else expl,
2455             default=self.default if default is None else default,
2456             optional=self.optional if optional is None else optional,
2457         )
2458
2459     def _encode(self):
2460         self._assert_ready()
2461         return b"".join((
2462             self.tag,
2463             len_encode(1),
2464             (b"\xFF" if self._value else b"\x00"),
2465         ))
2466
2467     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2468         try:
2469             t, _, lv = tag_strip(tlv)
2470         except DecodeError as err:
2471             raise err.__class__(
2472                 msg=err.msg,
2473                 klass=self.__class__,
2474                 decode_path=decode_path,
2475                 offset=offset,
2476             )
2477         if t != self.tag:
2478             raise TagMismatch(
2479                 klass=self.__class__,
2480                 decode_path=decode_path,
2481                 offset=offset,
2482             )
2483         if tag_only:
2484             yield None
2485             return
2486         try:
2487             l, _, v = len_decode(lv)
2488         except DecodeError as err:
2489             raise err.__class__(
2490                 msg=err.msg,
2491                 klass=self.__class__,
2492                 decode_path=decode_path,
2493                 offset=offset,
2494             )
2495         if l != 1:
2496             raise InvalidLength(
2497                 "Boolean's length must be equal to 1",
2498                 klass=self.__class__,
2499                 decode_path=decode_path,
2500                 offset=offset,
2501             )
2502         if l > len(v):
2503             raise NotEnoughData(
2504                 "encoded length is longer than data",
2505                 klass=self.__class__,
2506                 decode_path=decode_path,
2507                 offset=offset,
2508             )
2509         first_octet = byte2int(v)
2510         ber_encoded = False
2511         if first_octet == 0:
2512             value = False
2513         elif first_octet == 0xFF:
2514             value = True
2515         elif ctx.get("bered", False):
2516             value = True
2517             ber_encoded = True
2518         else:
2519             raise DecodeError(
2520                 "unacceptable Boolean value",
2521                 klass=self.__class__,
2522                 decode_path=decode_path,
2523                 offset=offset,
2524             )
2525         obj = self.__class__(
2526             value=value,
2527             impl=self.tag,
2528             expl=self._expl,
2529             default=self.default,
2530             optional=self.optional,
2531             _decoded=(offset, 1, 1),
2532         )
2533         obj.ber_encoded = ber_encoded
2534         yield decode_path, obj, v[1:]
2535
2536     def __repr__(self):
2537         return pp_console_row(next(self.pps()))
2538
2539     def pps(self, decode_path=()):
2540         yield _pp(
2541             obj=self,
2542             asn1_type_name=self.asn1_type_name,
2543             obj_name=self.__class__.__name__,
2544             decode_path=decode_path,
2545             value=str(self._value) if self.ready else None,
2546             optional=self.optional,
2547             default=self == self.default,
2548             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2549             expl=None if self._expl is None else tag_decode(self._expl),
2550             offset=self.offset,
2551             tlen=self.tlen,
2552             llen=self.llen,
2553             vlen=self.vlen,
2554             expl_offset=self.expl_offset if self.expled else None,
2555             expl_tlen=self.expl_tlen if self.expled else None,
2556             expl_llen=self.expl_llen if self.expled else None,
2557             expl_vlen=self.expl_vlen if self.expled else None,
2558             expl_lenindef=self.expl_lenindef,
2559             ber_encoded=self.ber_encoded,
2560             bered=self.bered,
2561         )
2562         for pp in self.pps_lenindef(decode_path):
2563             yield pp
2564
2565
2566 IntegerState = namedtuple(
2567     "IntegerState",
2568     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2569     **NAMEDTUPLE_KWARGS
2570 )
2571
2572
2573 class Integer(Obj):
2574     """``INTEGER`` integer type
2575
2576     >>> b = Integer(-123)
2577     INTEGER -123
2578     >>> b == Integer(-123)
2579     True
2580     >>> int(b)
2581     -123
2582
2583     >>> Integer(2, bounds=(1, 3))
2584     INTEGER 2
2585     >>> Integer(5, bounds=(1, 3))
2586     Traceback (most recent call last):
2587     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2588
2589     ::
2590
2591         class Version(Integer):
2592             schema = (
2593                 ("v1", 0),
2594                 ("v2", 1),
2595                 ("v3", 2),
2596             )
2597
2598     >>> v = Version("v1")
2599     Version INTEGER v1
2600     >>> int(v)
2601     0
2602     >>> v.named
2603     'v1'
2604     >>> v.specs
2605     {'v3': 2, 'v1': 0, 'v2': 1}
2606     """
2607     __slots__ = ("specs", "_bound_min", "_bound_max")
2608     tag_default = tag_encode(2)
2609     asn1_type_name = "INTEGER"
2610
2611     def __init__(
2612             self,
2613             value=None,
2614             bounds=None,
2615             impl=None,
2616             expl=None,
2617             default=None,
2618             optional=False,
2619             _specs=None,
2620             _decoded=(0, 0, 0),
2621     ):
2622         """
2623         :param value: set the value. Either integer type, named value
2624                       (if ``schema`` is specified in the class), or
2625                       :py:class:`pyderasn.Integer` object
2626         :param bounds: set ``(MIN, MAX)`` value constraint.
2627                        (-inf, +inf) by default
2628         :param bytes impl: override default tag with ``IMPLICIT`` one
2629         :param bytes expl: override default tag with ``EXPLICIT`` one
2630         :param default: set default value. Type same as in ``value``
2631         :param bool optional: is object ``OPTIONAL`` in sequence
2632         """
2633         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2634         self._value = value
2635         specs = getattr(self, "schema", {}) if _specs is None else _specs
2636         self.specs = specs if specs.__class__ == dict else dict(specs)
2637         self._bound_min, self._bound_max = getattr(
2638             self,
2639             "bounds",
2640             (float("-inf"), float("+inf")),
2641         ) if bounds is None else bounds
2642         if value is not None:
2643             self._value = self._value_sanitize(value)
2644         if default is not None:
2645             default = self._value_sanitize(default)
2646             self.default = self.__class__(
2647                 value=default,
2648                 impl=self.tag,
2649                 expl=self._expl,
2650                 _specs=self.specs,
2651             )
2652             if self._value is None:
2653                 self._value = default
2654
2655     def _value_sanitize(self, value):
2656         if isinstance(value, integer_types):
2657             pass
2658         elif issubclass(value.__class__, Integer):
2659             value = value._value
2660         elif value.__class__ == str:
2661             value = self.specs.get(value)
2662             if value is None:
2663                 raise ObjUnknown("integer value: %s" % value)
2664         else:
2665             raise InvalidValueType((self.__class__, int, str))
2666         if not self._bound_min <= value <= self._bound_max:
2667             raise BoundsError(self._bound_min, value, self._bound_max)
2668         return value
2669
2670     @property
2671     def ready(self):
2672         return self._value is not None
2673
2674     def __getstate__(self):
2675         return IntegerState(
2676             __version__,
2677             self.tag,
2678             self._tag_order,
2679             self._expl,
2680             self.default,
2681             self.optional,
2682             self.offset,
2683             self.llen,
2684             self.vlen,
2685             self.expl_lenindef,
2686             self.lenindef,
2687             self.ber_encoded,
2688             self.specs,
2689             self._value,
2690             self._bound_min,
2691             self._bound_max,
2692         )
2693
2694     def __setstate__(self, state):
2695         super(Integer, self).__setstate__(state)
2696         self.specs = state.specs
2697         self._value = state.value
2698         self._bound_min = state.bound_min
2699         self._bound_max = state.bound_max
2700
2701     def __int__(self):
2702         self._assert_ready()
2703         return int(self._value)
2704
2705     def __hash__(self):
2706         self._assert_ready()
2707         return hash(
2708             self.tag +
2709             bytes(self._expl or b"") +
2710             str(self._value).encode("ascii"),
2711         )
2712
2713     def __eq__(self, their):
2714         if isinstance(their, integer_types):
2715             return self._value == their
2716         if not issubclass(their.__class__, Integer):
2717             return False
2718         return (
2719             self._value == their._value and
2720             self.tag == their.tag and
2721             self._expl == their._expl
2722         )
2723
2724     def __lt__(self, their):
2725         return self._value < their._value
2726
2727     @property
2728     def named(self):
2729         """Return named representation (if exists) of the value
2730         """
2731         for name, value in iteritems(self.specs):
2732             if value == self._value:
2733                 return name
2734         return None
2735
2736     def __call__(
2737             self,
2738             value=None,
2739             bounds=None,
2740             impl=None,
2741             expl=None,
2742             default=None,
2743             optional=None,
2744     ):
2745         return self.__class__(
2746             value=value,
2747             bounds=(
2748                 (self._bound_min, self._bound_max)
2749                 if bounds is None else bounds
2750             ),
2751             impl=self.tag if impl is None else impl,
2752             expl=self._expl if expl is None else expl,
2753             default=self.default if default is None else default,
2754             optional=self.optional if optional is None else optional,
2755             _specs=self.specs,
2756         )
2757
2758     def _encode(self):
2759         self._assert_ready()
2760         value = self._value
2761         if PY2:
2762             if value == 0:
2763                 octets = bytearray([0])
2764             elif value < 0:
2765                 value = -value
2766                 value -= 1
2767                 octets = bytearray()
2768                 while value > 0:
2769                     octets.append((value & 0xFF) ^ 0xFF)
2770                     value >>= 8
2771                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2772                     octets.append(0xFF)
2773             else:
2774                 octets = bytearray()
2775                 while value > 0:
2776                     octets.append(value & 0xFF)
2777                     value >>= 8
2778                 if octets[-1] & 0x80 > 0:
2779                     octets.append(0x00)
2780             octets.reverse()
2781             octets = bytes(octets)
2782         else:
2783             bytes_len = ceil(value.bit_length() / 8) or 1
2784             while True:
2785                 try:
2786                     octets = value.to_bytes(
2787                         bytes_len,
2788                         byteorder="big",
2789                         signed=True,
2790                     )
2791                 except OverflowError:
2792                     bytes_len += 1
2793                 else:
2794                     break
2795         return b"".join((self.tag, len_encode(len(octets)), octets))
2796
2797     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2798         try:
2799             t, _, lv = tag_strip(tlv)
2800         except DecodeError as err:
2801             raise err.__class__(
2802                 msg=err.msg,
2803                 klass=self.__class__,
2804                 decode_path=decode_path,
2805                 offset=offset,
2806             )
2807         if t != self.tag:
2808             raise TagMismatch(
2809                 klass=self.__class__,
2810                 decode_path=decode_path,
2811                 offset=offset,
2812             )
2813         if tag_only:
2814             yield None
2815             return
2816         try:
2817             l, llen, v = len_decode(lv)
2818         except DecodeError as err:
2819             raise err.__class__(
2820                 msg=err.msg,
2821                 klass=self.__class__,
2822                 decode_path=decode_path,
2823                 offset=offset,
2824             )
2825         if l > len(v):
2826             raise NotEnoughData(
2827                 "encoded length is longer than data",
2828                 klass=self.__class__,
2829                 decode_path=decode_path,
2830                 offset=offset,
2831             )
2832         if l == 0:
2833             raise NotEnoughData(
2834                 "zero length",
2835                 klass=self.__class__,
2836                 decode_path=decode_path,
2837                 offset=offset,
2838             )
2839         v, tail = v[:l], v[l:]
2840         first_octet = byte2int(v)
2841         if l > 1:
2842             second_octet = byte2int(v[1:])
2843             if (
2844                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2845                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2846             ):
2847                 raise DecodeError(
2848                     "non normalized integer",
2849                     klass=self.__class__,
2850                     decode_path=decode_path,
2851                     offset=offset,
2852                 )
2853         if PY2:
2854             value = 0
2855             if first_octet & 0x80 > 0:
2856                 octets = bytearray()
2857                 for octet in bytearray(v):
2858                     octets.append(octet ^ 0xFF)
2859                 for octet in octets:
2860                     value = (value << 8) | octet
2861                 value += 1
2862                 value = -value
2863             else:
2864                 for octet in bytearray(v):
2865                     value = (value << 8) | octet
2866         else:
2867             value = int.from_bytes(v, byteorder="big", signed=True)
2868         try:
2869             obj = self.__class__(
2870                 value=value,
2871                 bounds=(self._bound_min, self._bound_max),
2872                 impl=self.tag,
2873                 expl=self._expl,
2874                 default=self.default,
2875                 optional=self.optional,
2876                 _specs=self.specs,
2877                 _decoded=(offset, llen, l),
2878             )
2879         except BoundsError as err:
2880             raise DecodeError(
2881                 msg=str(err),
2882                 klass=self.__class__,
2883                 decode_path=decode_path,
2884                 offset=offset,
2885             )
2886         yield decode_path, obj, tail
2887
2888     def __repr__(self):
2889         return pp_console_row(next(self.pps()))
2890
2891     def pps(self, decode_path=()):
2892         yield _pp(
2893             obj=self,
2894             asn1_type_name=self.asn1_type_name,
2895             obj_name=self.__class__.__name__,
2896             decode_path=decode_path,
2897             value=(self.named or str(self._value)) if self.ready else None,
2898             optional=self.optional,
2899             default=self == self.default,
2900             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2901             expl=None if self._expl is None else tag_decode(self._expl),
2902             offset=self.offset,
2903             tlen=self.tlen,
2904             llen=self.llen,
2905             vlen=self.vlen,
2906             expl_offset=self.expl_offset if self.expled else None,
2907             expl_tlen=self.expl_tlen if self.expled else None,
2908             expl_llen=self.expl_llen if self.expled else None,
2909             expl_vlen=self.expl_vlen if self.expled else None,
2910             expl_lenindef=self.expl_lenindef,
2911             bered=self.bered,
2912         )
2913         for pp in self.pps_lenindef(decode_path):
2914             yield pp
2915
2916
2917 BitStringState = namedtuple(
2918     "BitStringState",
2919     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
2920     **NAMEDTUPLE_KWARGS
2921 )
2922
2923
2924 class BitString(Obj):
2925     """``BIT STRING`` bit string type
2926
2927     >>> BitString(b"hello world")
2928     BIT STRING 88 bits 68656c6c6f20776f726c64
2929     >>> bytes(b)
2930     b'hello world'
2931     >>> b == b"hello world"
2932     True
2933     >>> b.bit_len
2934     88
2935
2936     >>> BitString("'0A3B5F291CD'H")
2937     BIT STRING 44 bits 0a3b5f291cd0
2938     >>> b = BitString("'010110000000'B")
2939     BIT STRING 12 bits 5800
2940     >>> b.bit_len
2941     12
2942     >>> b[0], b[1], b[2], b[3]
2943     (False, True, False, True)
2944     >>> b[1000]
2945     False
2946     >>> [v for v in b]
2947     [False, True, False, True, True, False, False, False, False, False, False, False]
2948
2949     ::
2950
2951         class KeyUsage(BitString):
2952             schema = (
2953                 ("digitalSignature", 0),
2954                 ("nonRepudiation", 1),
2955                 ("keyEncipherment", 2),
2956             )
2957
2958     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
2959     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
2960     >>> b.named
2961     ['nonRepudiation', 'keyEncipherment']
2962     >>> b.specs
2963     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
2964
2965     .. note::
2966
2967        Pay attention that BIT STRING can be encoded both in primitive
2968        and constructed forms. Decoder always checks constructed form tag
2969        additionally to specified primitive one. If BER decoding is
2970        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
2971        of DER restrictions.
2972     """
2973     __slots__ = ("tag_constructed", "specs", "defined")
2974     tag_default = tag_encode(3)
2975     asn1_type_name = "BIT STRING"
2976
2977     def __init__(
2978             self,
2979             value=None,
2980             impl=None,
2981             expl=None,
2982             default=None,
2983             optional=False,
2984             _specs=None,
2985             _decoded=(0, 0, 0),
2986     ):
2987         """
2988         :param value: set the value. Either binary type, tuple of named
2989                       values (if ``schema`` is specified in the class),
2990                       string in ``'XXX...'B`` form, or
2991                       :py:class:`pyderasn.BitString` object
2992         :param bytes impl: override default tag with ``IMPLICIT`` one
2993         :param bytes expl: override default tag with ``EXPLICIT`` one
2994         :param default: set default value. Type same as in ``value``
2995         :param bool optional: is object ``OPTIONAL`` in sequence
2996         """
2997         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
2998         specs = getattr(self, "schema", {}) if _specs is None else _specs
2999         self.specs = specs if specs.__class__ == dict else dict(specs)
3000         self._value = None if value is None else self._value_sanitize(value)
3001         if default is not None:
3002             default = self._value_sanitize(default)
3003             self.default = self.__class__(
3004                 value=default,
3005                 impl=self.tag,
3006                 expl=self._expl,
3007             )
3008             if value is None:
3009                 self._value = default
3010         self.defined = None
3011         tag_klass, _, tag_num = tag_decode(self.tag)
3012         self.tag_constructed = tag_encode(
3013             klass=tag_klass,
3014             form=TagFormConstructed,
3015             num=tag_num,
3016         )
3017
3018     def _bits2octets(self, bits):
3019         if len(self.specs) > 0:
3020             bits = bits.rstrip("0")
3021         bit_len = len(bits)
3022         bits += "0" * ((8 - (bit_len % 8)) % 8)
3023         octets = bytearray(len(bits) // 8)
3024         for i in six_xrange(len(octets)):
3025             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3026         return bit_len, bytes(octets)
3027
3028     def _value_sanitize(self, value):
3029         if isinstance(value, (string_types, binary_type)):
3030             if (
3031                     isinstance(value, string_types) and
3032                     value.startswith("'")
3033             ):
3034                 if value.endswith("'B"):
3035                     value = value[1:-2]
3036                     if not frozenset(value) <= SET01:
3037                         raise ValueError("B's coding contains unacceptable chars")
3038                     return self._bits2octets(value)
3039                 if value.endswith("'H"):
3040                     value = value[1:-2]
3041                     return (
3042                         len(value) * 4,
3043                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3044                     )
3045             if value.__class__ == binary_type:
3046                 return (len(value) * 8, value)
3047             raise InvalidValueType((self.__class__, string_types, binary_type))
3048         if value.__class__ == tuple:
3049             if (
3050                     len(value) == 2 and
3051                     isinstance(value[0], integer_types) and
3052                     value[1].__class__ == binary_type
3053             ):
3054                 return value
3055             bits = []
3056             for name in value:
3057                 bit = self.specs.get(name)
3058                 if bit is None:
3059                     raise ObjUnknown("BitString value: %s" % name)
3060                 bits.append(bit)
3061             if len(bits) == 0:
3062                 return self._bits2octets("")
3063             bits = frozenset(bits)
3064             return self._bits2octets("".join(
3065                 ("1" if bit in bits else "0")
3066                 for bit in six_xrange(max(bits) + 1)
3067             ))
3068         if issubclass(value.__class__, BitString):
3069             return value._value
3070         raise InvalidValueType((self.__class__, binary_type, string_types))
3071
3072     @property
3073     def ready(self):
3074         return self._value is not None
3075
3076     def __getstate__(self):
3077         return BitStringState(
3078             __version__,
3079             self.tag,
3080             self._tag_order,
3081             self._expl,
3082             self.default,
3083             self.optional,
3084             self.offset,
3085             self.llen,
3086             self.vlen,
3087             self.expl_lenindef,
3088             self.lenindef,
3089             self.ber_encoded,
3090             self.specs,
3091             self._value,
3092             self.tag_constructed,
3093             self.defined,
3094         )
3095
3096     def __setstate__(self, state):
3097         super(BitString, self).__setstate__(state)
3098         self.specs = state.specs
3099         self._value = state.value
3100         self.tag_constructed = state.tag_constructed
3101         self.defined = state.defined
3102
3103     def __iter__(self):
3104         self._assert_ready()
3105         for i in six_xrange(self._value[0]):
3106             yield self[i]
3107
3108     @property
3109     def bit_len(self):
3110         """Returns number of bits in the string
3111         """
3112         self._assert_ready()
3113         return self._value[0]
3114
3115     def __bytes__(self):
3116         self._assert_ready()
3117         return self._value[1]
3118
3119     def __eq__(self, their):
3120         if their.__class__ == bytes:
3121             return self._value[1] == their
3122         if not issubclass(their.__class__, BitString):
3123             return False
3124         return (
3125             self._value == their._value and
3126             self.tag == their.tag and
3127             self._expl == their._expl
3128         )
3129
3130     @property
3131     def named(self):
3132         """Named representation (if exists) of the bits
3133
3134         :returns: [str(name), ...]
3135         """
3136         return [name for name, bit in iteritems(self.specs) if self[bit]]
3137
3138     def __call__(
3139             self,
3140             value=None,
3141             impl=None,
3142             expl=None,
3143             default=None,
3144             optional=None,
3145     ):
3146         return self.__class__(
3147             value=value,
3148             impl=self.tag if impl is None else impl,
3149             expl=self._expl if expl is None else expl,
3150             default=self.default if default is None else default,
3151             optional=self.optional if optional is None else optional,
3152             _specs=self.specs,
3153         )
3154
3155     def __getitem__(self, key):
3156         if key.__class__ == int:
3157             bit_len, octets = self._value
3158             if key >= bit_len:
3159                 return False
3160             return (
3161                 byte2int(memoryview(octets)[key // 8:]) >>
3162                 (7 - (key % 8))
3163             ) & 1 == 1
3164         if isinstance(key, string_types):
3165             value = self.specs.get(key)
3166             if value is None:
3167                 raise ObjUnknown("BitString value: %s" % key)
3168             return self[value]
3169         raise InvalidValueType((int, str))
3170
3171     def _encode(self):
3172         self._assert_ready()
3173         bit_len, octets = self._value
3174         return b"".join((
3175             self.tag,
3176             len_encode(len(octets) + 1),
3177             int2byte((8 - bit_len % 8) % 8),
3178             octets,
3179         ))
3180
3181     def _encode_cer(self, writer):
3182         bit_len, octets = self._value
3183         if len(octets) + 1 <= 1000:
3184             write_full(writer, self._encode())
3185             return
3186         write_full(writer, self.tag_constructed)
3187         write_full(writer, LENINDEF)
3188         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3189             write_full(writer, b"".join((
3190                 BitString.tag_default,
3191                 LEN1K,
3192                 int2byte(0),
3193                 octets[offset:offset + 999],
3194             )))
3195         tail = octets[offset+999:]
3196         if len(tail) > 0:
3197             tail = int2byte((8 - bit_len % 8) % 8) + tail
3198             write_full(writer, b"".join((
3199                 BitString.tag_default,
3200                 len_encode(len(tail)),
3201                 tail,
3202             )))
3203         write_full(writer, EOC)
3204
3205     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3206         try:
3207             t, tlen, lv = tag_strip(tlv)
3208         except DecodeError as err:
3209             raise err.__class__(
3210                 msg=err.msg,
3211                 klass=self.__class__,
3212                 decode_path=decode_path,
3213                 offset=offset,
3214             )
3215         if t == self.tag:
3216             if tag_only:  # pragma: no cover
3217                 yield None
3218                 return
3219             try:
3220                 l, llen, v = len_decode(lv)
3221             except DecodeError as err:
3222                 raise err.__class__(
3223                     msg=err.msg,
3224                     klass=self.__class__,
3225                     decode_path=decode_path,
3226                     offset=offset,
3227                 )
3228             if l > len(v):
3229                 raise NotEnoughData(
3230                     "encoded length is longer than data",
3231                     klass=self.__class__,
3232                     decode_path=decode_path,
3233                     offset=offset,
3234                 )
3235             if l == 0:
3236                 raise NotEnoughData(
3237                     "zero length",
3238                     klass=self.__class__,
3239                     decode_path=decode_path,
3240                     offset=offset,
3241                 )
3242             pad_size = byte2int(v)
3243             if l == 1 and pad_size != 0:
3244                 raise DecodeError(
3245                     "invalid empty value",
3246                     klass=self.__class__,
3247                     decode_path=decode_path,
3248                     offset=offset,
3249                 )
3250             if pad_size > 7:
3251                 raise DecodeError(
3252                     "too big pad",
3253                     klass=self.__class__,
3254                     decode_path=decode_path,
3255                     offset=offset,
3256                 )
3257             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3258                 raise DecodeError(
3259                     "invalid pad",
3260                     klass=self.__class__,
3261                     decode_path=decode_path,
3262                     offset=offset,
3263                 )
3264             v, tail = v[:l], v[l:]
3265             bit_len = (len(v) - 1) * 8 - pad_size
3266             obj = self.__class__(
3267                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3268                 impl=self.tag,
3269                 expl=self._expl,
3270                 default=self.default,
3271                 optional=self.optional,
3272                 _specs=self.specs,
3273                 _decoded=(offset, llen, l),
3274             )
3275             if evgen_mode:
3276                 obj._value = (bit_len, None)
3277             yield decode_path, obj, tail
3278             return
3279         if t != self.tag_constructed:
3280             raise TagMismatch(
3281                 klass=self.__class__,
3282                 decode_path=decode_path,
3283                 offset=offset,
3284             )
3285         if not ctx.get("bered", False):
3286             raise DecodeError(
3287                 "unallowed BER constructed encoding",
3288                 klass=self.__class__,
3289                 decode_path=decode_path,
3290                 offset=offset,
3291             )
3292         if tag_only:  # pragma: no cover
3293             yield None
3294             return
3295         lenindef = False
3296         try:
3297             l, llen, v = len_decode(lv)
3298         except LenIndefForm:
3299             llen, l, v = 1, 0, lv[1:]
3300             lenindef = True
3301         except DecodeError as err:
3302             raise err.__class__(
3303                 msg=err.msg,
3304                 klass=self.__class__,
3305                 decode_path=decode_path,
3306                 offset=offset,
3307             )
3308         if l > len(v):
3309             raise NotEnoughData(
3310                 "encoded length is longer than data",
3311                 klass=self.__class__,
3312                 decode_path=decode_path,
3313                 offset=offset,
3314             )
3315         if not lenindef and l == 0:
3316             raise NotEnoughData(
3317                 "zero length",
3318                 klass=self.__class__,
3319                 decode_path=decode_path,
3320                 offset=offset,
3321             )
3322         chunks = []
3323         sub_offset = offset + tlen + llen
3324         vlen = 0
3325         while True:
3326             if lenindef:
3327                 if v[:EOC_LEN].tobytes() == EOC:
3328                     break
3329             else:
3330                 if vlen == l:
3331                     break
3332                 if vlen > l:
3333                     raise DecodeError(
3334                         "chunk out of bounds",
3335                         klass=self.__class__,
3336                         decode_path=decode_path + (str(len(chunks) - 1),),
3337                         offset=chunks[-1].offset,
3338                     )
3339             sub_decode_path = decode_path + (str(len(chunks)),)
3340             try:
3341                 if evgen_mode:
3342                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3343                             v,
3344                             offset=sub_offset,
3345                             decode_path=sub_decode_path,
3346                             leavemm=True,
3347                             ctx=ctx,
3348                             _ctx_immutable=False,
3349                     ):
3350                         yield _decode_path, chunk, v_tail
3351                 else:
3352                     _, chunk, v_tail = next(BitString().decode_evgen(
3353                         v,
3354                         offset=sub_offset,
3355                         decode_path=sub_decode_path,
3356                         leavemm=True,
3357                         ctx=ctx,
3358                         _ctx_immutable=False,
3359                         _evgen_mode=False,
3360                     ))
3361             except TagMismatch:
3362                 raise DecodeError(
3363                     "expected BitString encoded chunk",
3364                     klass=self.__class__,
3365                     decode_path=sub_decode_path,
3366                     offset=sub_offset,
3367                 )
3368             chunks.append(chunk)
3369             sub_offset += chunk.tlvlen
3370             vlen += chunk.tlvlen
3371             v = v_tail
3372         if len(chunks) == 0:
3373             raise DecodeError(
3374                 "no chunks",
3375                 klass=self.__class__,
3376                 decode_path=decode_path,
3377                 offset=offset,
3378             )
3379         values = []
3380         bit_len = 0
3381         for chunk_i, chunk in enumerate(chunks[:-1]):
3382             if chunk.bit_len % 8 != 0:
3383                 raise DecodeError(
3384                     "BitString chunk is not multiple of 8 bits",
3385                     klass=self.__class__,
3386                     decode_path=decode_path + (str(chunk_i),),
3387                     offset=chunk.offset,
3388                 )
3389             if not evgen_mode:
3390                 values.append(bytes(chunk))
3391             bit_len += chunk.bit_len
3392         chunk_last = chunks[-1]
3393         if not evgen_mode:
3394             values.append(bytes(chunk_last))
3395         bit_len += chunk_last.bit_len
3396         obj = self.__class__(
3397             value=None if evgen_mode else (bit_len, b"".join(values)),
3398             impl=self.tag,
3399             expl=self._expl,
3400             default=self.default,
3401             optional=self.optional,
3402             _specs=self.specs,
3403             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3404         )
3405         if evgen_mode:
3406             obj._value = (bit_len, None)
3407         obj.lenindef = lenindef
3408         obj.ber_encoded = True
3409         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3410
3411     def __repr__(self):
3412         return pp_console_row(next(self.pps()))
3413
3414     def pps(self, decode_path=()):
3415         value = None
3416         blob = None
3417         if self.ready:
3418             bit_len, blob = self._value
3419             value = "%d bits" % bit_len
3420             if len(self.specs) > 0 and blob is not None:
3421                 blob = tuple(self.named)
3422         yield _pp(
3423             obj=self,
3424             asn1_type_name=self.asn1_type_name,
3425             obj_name=self.__class__.__name__,
3426             decode_path=decode_path,
3427             value=value,
3428             blob=blob,
3429             optional=self.optional,
3430             default=self == self.default,
3431             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3432             expl=None if self._expl is None else tag_decode(self._expl),
3433             offset=self.offset,
3434             tlen=self.tlen,
3435             llen=self.llen,
3436             vlen=self.vlen,
3437             expl_offset=self.expl_offset if self.expled else None,
3438             expl_tlen=self.expl_tlen if self.expled else None,
3439             expl_llen=self.expl_llen if self.expled else None,
3440             expl_vlen=self.expl_vlen if self.expled else None,
3441             expl_lenindef=self.expl_lenindef,
3442             lenindef=self.lenindef,
3443             ber_encoded=self.ber_encoded,
3444             bered=self.bered,
3445         )
3446         defined_by, defined = self.defined or (None, None)
3447         if defined_by is not None:
3448             yield defined.pps(
3449                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3450             )
3451         for pp in self.pps_lenindef(decode_path):
3452             yield pp
3453
3454
3455 OctetStringState = namedtuple(
3456     "OctetStringState",
3457     BasicState._fields + (
3458         "value",
3459         "bound_min",
3460         "bound_max",
3461         "tag_constructed",
3462         "defined",
3463     ),
3464     **NAMEDTUPLE_KWARGS
3465 )
3466
3467
3468 class OctetString(Obj):
3469     """``OCTET STRING`` binary string type
3470
3471     >>> s = OctetString(b"hello world")
3472     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3473     >>> s == OctetString(b"hello world")
3474     True
3475     >>> bytes(s)
3476     b'hello world'
3477
3478     >>> OctetString(b"hello", bounds=(4, 4))
3479     Traceback (most recent call last):
3480     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3481     >>> OctetString(b"hell", bounds=(4, 4))
3482     OCTET STRING 4 bytes 68656c6c
3483
3484     Memoryviews can be used as a values. If memoryview is made on
3485     mmap-ed file, then it does not take storage inside OctetString
3486     itself. In CER encoding mode it will be streamed to the specified
3487     writer, copying 1 KB chunks.
3488     """
3489     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3490     tag_default = tag_encode(4)
3491     asn1_type_name = "OCTET STRING"
3492     evgen_mode_skip_value = True
3493
3494     def __init__(
3495             self,
3496             value=None,
3497             bounds=None,
3498             impl=None,
3499             expl=None,
3500             default=None,
3501             optional=False,
3502             _decoded=(0, 0, 0),
3503             ctx=None,
3504     ):
3505         """
3506         :param value: set the value. Either binary type, or
3507                       :py:class:`pyderasn.OctetString` object
3508         :param bounds: set ``(MIN, MAX)`` value size constraint.
3509                        (-inf, +inf) by default
3510         :param bytes impl: override default tag with ``IMPLICIT`` one
3511         :param bytes expl: override default tag with ``EXPLICIT`` one
3512         :param default: set default value. Type same as in ``value``
3513         :param bool optional: is object ``OPTIONAL`` in sequence
3514         """
3515         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3516         self._value = value
3517         self._bound_min, self._bound_max = getattr(
3518             self,
3519             "bounds",
3520             (0, float("+inf")),
3521         ) if bounds is None else bounds
3522         if value is not None:
3523             self._value = self._value_sanitize(value)
3524         if default is not None:
3525             default = self._value_sanitize(default)
3526             self.default = self.__class__(
3527                 value=default,
3528                 impl=self.tag,
3529                 expl=self._expl,
3530             )
3531             if self._value is None:
3532                 self._value = default
3533         self.defined = None
3534         tag_klass, _, tag_num = tag_decode(self.tag)
3535         self.tag_constructed = tag_encode(
3536             klass=tag_klass,
3537             form=TagFormConstructed,
3538             num=tag_num,
3539         )
3540
3541     def _value_sanitize(self, value):
3542         if value.__class__ == binary_type or value.__class__ == memoryview:
3543             pass
3544         elif issubclass(value.__class__, OctetString):
3545             value = value._value
3546         else:
3547             raise InvalidValueType((self.__class__, bytes, memoryview))
3548         if not self._bound_min <= len(value) <= self._bound_max:
3549             raise BoundsError(self._bound_min, len(value), self._bound_max)
3550         return value
3551
3552     @property
3553     def ready(self):
3554         return self._value is not None
3555
3556     def __getstate__(self):
3557         return OctetStringState(
3558             __version__,
3559             self.tag,
3560             self._tag_order,
3561             self._expl,
3562             self.default,
3563             self.optional,
3564             self.offset,
3565             self.llen,
3566             self.vlen,
3567             self.expl_lenindef,
3568             self.lenindef,
3569             self.ber_encoded,
3570             self._value,
3571             self._bound_min,
3572             self._bound_max,
3573             self.tag_constructed,
3574             self.defined,
3575         )
3576
3577     def __setstate__(self, state):
3578         super(OctetString, self).__setstate__(state)
3579         self._value = state.value
3580         self._bound_min = state.bound_min
3581         self._bound_max = state.bound_max
3582         self.tag_constructed = state.tag_constructed
3583         self.defined = state.defined
3584
3585     def __bytes__(self):
3586         self._assert_ready()
3587         return bytes(self._value)
3588
3589     def __eq__(self, their):
3590         if their.__class__ == binary_type:
3591             return self._value == their
3592         if not issubclass(their.__class__, OctetString):
3593             return False
3594         return (
3595             self._value == their._value and
3596             self.tag == their.tag and
3597             self._expl == their._expl
3598         )
3599
3600     def __lt__(self, their):
3601         return self._value < their._value
3602
3603     def __call__(
3604             self,
3605             value=None,
3606             bounds=None,
3607             impl=None,
3608             expl=None,
3609             default=None,
3610             optional=None,
3611     ):
3612         return self.__class__(
3613             value=value,
3614             bounds=(
3615                 (self._bound_min, self._bound_max)
3616                 if bounds is None else bounds
3617             ),
3618             impl=self.tag if impl is None else impl,
3619             expl=self._expl if expl is None else expl,
3620             default=self.default if default is None else default,
3621             optional=self.optional if optional is None else optional,
3622         )
3623
3624     def _encode(self):
3625         self._assert_ready()
3626         return b"".join((
3627             self.tag,
3628             len_encode(len(self._value)),
3629             self._value,
3630         ))
3631
3632     def _encode_cer(self, writer):
3633         octets = self._value
3634         if len(octets) <= 1000:
3635             write_full(writer, self._encode())
3636             return
3637         write_full(writer, self.tag_constructed)
3638         write_full(writer, LENINDEF)
3639         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3640             write_full(writer, b"".join((
3641                 OctetString.tag_default,
3642                 LEN1K,
3643                 octets[offset:offset + 1000],
3644             )))
3645         tail = octets[offset+1000:]
3646         if len(tail) > 0:
3647             write_full(writer, b"".join((
3648                 OctetString.tag_default,
3649                 len_encode(len(tail)),
3650                 tail,
3651             )))
3652         write_full(writer, EOC)
3653
3654     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3655         try:
3656             t, tlen, lv = tag_strip(tlv)
3657         except DecodeError as err:
3658             raise err.__class__(
3659                 msg=err.msg,
3660                 klass=self.__class__,
3661                 decode_path=decode_path,
3662                 offset=offset,
3663             )
3664         if t == self.tag:
3665             if tag_only:
3666                 yield None
3667                 return
3668             try:
3669                 l, llen, v = len_decode(lv)
3670             except DecodeError as err:
3671                 raise err.__class__(
3672                     msg=err.msg,
3673                     klass=self.__class__,
3674                     decode_path=decode_path,
3675                     offset=offset,
3676                 )
3677             if l > len(v):
3678                 raise NotEnoughData(
3679                     "encoded length is longer than data",
3680                     klass=self.__class__,
3681                     decode_path=decode_path,
3682                     offset=offset,
3683                 )
3684             v, tail = v[:l], v[l:]
3685             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3686                 raise DecodeError(
3687                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3688                     klass=self.__class__,
3689                     decode_path=decode_path,
3690                     offset=offset,
3691                 )
3692             try:
3693                 obj = self.__class__(
3694                     value=(
3695                         None if (evgen_mode and self.evgen_mode_skip_value)
3696                         else v.tobytes()
3697                     ),
3698                     bounds=(self._bound_min, self._bound_max),
3699                     impl=self.tag,
3700                     expl=self._expl,
3701                     default=self.default,
3702                     optional=self.optional,
3703                     _decoded=(offset, llen, l),
3704                     ctx=ctx,
3705                 )
3706             except DecodeError as err:
3707                 raise DecodeError(
3708                     msg=err.msg,
3709                     klass=self.__class__,
3710                     decode_path=decode_path,
3711                     offset=offset,
3712                 )
3713             except BoundsError as err:
3714                 raise DecodeError(
3715                     msg=str(err),
3716                     klass=self.__class__,
3717                     decode_path=decode_path,
3718                     offset=offset,
3719                 )
3720             yield decode_path, obj, tail
3721             return
3722         if t != self.tag_constructed:
3723             raise TagMismatch(
3724                 klass=self.__class__,
3725                 decode_path=decode_path,
3726                 offset=offset,
3727             )
3728         if not ctx.get("bered", False):
3729             raise DecodeError(
3730                 "unallowed BER constructed encoding",
3731                 klass=self.__class__,
3732                 decode_path=decode_path,
3733                 offset=offset,
3734             )
3735         if tag_only:
3736             yield None
3737             return
3738         lenindef = False
3739         try:
3740             l, llen, v = len_decode(lv)
3741         except LenIndefForm:
3742             llen, l, v = 1, 0, lv[1:]
3743             lenindef = True
3744         except DecodeError as err:
3745             raise err.__class__(
3746                 msg=err.msg,
3747                 klass=self.__class__,
3748                 decode_path=decode_path,
3749                 offset=offset,
3750             )
3751         if l > len(v):
3752             raise NotEnoughData(
3753                 "encoded length is longer than data",
3754                 klass=self.__class__,
3755                 decode_path=decode_path,
3756                 offset=offset,
3757             )
3758         chunks = []
3759         chunks_count = 0
3760         sub_offset = offset + tlen + llen
3761         vlen = 0
3762         payload_len = 0
3763         while True:
3764             if lenindef:
3765                 if v[:EOC_LEN].tobytes() == EOC:
3766                     break
3767             else:
3768                 if vlen == l:
3769                     break
3770                 if vlen > l:
3771                     raise DecodeError(
3772                         "chunk out of bounds",
3773                         klass=self.__class__,
3774                         decode_path=decode_path + (str(len(chunks) - 1),),
3775                         offset=chunks[-1].offset,
3776                     )
3777             try:
3778                 if evgen_mode:
3779                     sub_decode_path = decode_path + (str(chunks_count),)
3780                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3781                             v,
3782                             offset=sub_offset,
3783                             decode_path=sub_decode_path,
3784                             leavemm=True,
3785                             ctx=ctx,
3786                             _ctx_immutable=False,
3787                     ):
3788                         yield _decode_path, chunk, v_tail
3789                         if not chunk.ber_encoded:
3790                             payload_len += chunk.vlen
3791                     chunks_count += 1
3792                 else:
3793                     sub_decode_path = decode_path + (str(len(chunks)),)
3794                     _, chunk, v_tail = next(OctetString().decode_evgen(
3795                         v,
3796                         offset=sub_offset,
3797                         decode_path=sub_decode_path,
3798                         leavemm=True,
3799                         ctx=ctx,
3800                         _ctx_immutable=False,
3801                         _evgen_mode=False,
3802                     ))
3803                     chunks.append(chunk)
3804             except TagMismatch:
3805                 raise DecodeError(
3806                     "expected OctetString encoded chunk",
3807                     klass=self.__class__,
3808                     decode_path=sub_decode_path,
3809                     offset=sub_offset,
3810                 )
3811             sub_offset += chunk.tlvlen
3812             vlen += chunk.tlvlen
3813             v = v_tail
3814         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3815             raise DecodeError(
3816                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3817                 klass=self.__class__,
3818                 decode_path=decode_path,
3819                 offset=offset,
3820             )
3821         try:
3822             obj = self.__class__(
3823                 value=(
3824                     None if evgen_mode else
3825                     b"".join(bytes(chunk) for chunk in chunks)
3826                 ),
3827                 bounds=(self._bound_min, self._bound_max),
3828                 impl=self.tag,
3829                 expl=self._expl,
3830                 default=self.default,
3831                 optional=self.optional,
3832                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3833                 ctx=ctx,
3834             )
3835         except DecodeError as err:
3836             raise DecodeError(
3837                 msg=err.msg,
3838                 klass=self.__class__,
3839                 decode_path=decode_path,
3840                 offset=offset,
3841             )
3842         except BoundsError as err:
3843             raise DecodeError(
3844                 msg=str(err),
3845                 klass=self.__class__,
3846                 decode_path=decode_path,
3847                 offset=offset,
3848             )
3849         obj.lenindef = lenindef
3850         obj.ber_encoded = True
3851         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3852
3853     def __repr__(self):
3854         return pp_console_row(next(self.pps()))
3855
3856     def pps(self, decode_path=()):
3857         yield _pp(
3858             obj=self,
3859             asn1_type_name=self.asn1_type_name,
3860             obj_name=self.__class__.__name__,
3861             decode_path=decode_path,
3862             value=("%d bytes" % len(self._value)) if self.ready else None,
3863             blob=self._value if self.ready else None,
3864             optional=self.optional,
3865             default=self == self.default,
3866             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3867             expl=None if self._expl is None else tag_decode(self._expl),
3868             offset=self.offset,
3869             tlen=self.tlen,
3870             llen=self.llen,
3871             vlen=self.vlen,
3872             expl_offset=self.expl_offset if self.expled else None,
3873             expl_tlen=self.expl_tlen if self.expled else None,
3874             expl_llen=self.expl_llen if self.expled else None,
3875             expl_vlen=self.expl_vlen if self.expled else None,
3876             expl_lenindef=self.expl_lenindef,
3877             lenindef=self.lenindef,
3878             ber_encoded=self.ber_encoded,
3879             bered=self.bered,
3880         )
3881         defined_by, defined = self.defined or (None, None)
3882         if defined_by is not None:
3883             yield defined.pps(
3884                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3885             )
3886         for pp in self.pps_lenindef(decode_path):
3887             yield pp
3888
3889
3890 def agg_octet_string(evgens, decode_path, raw, writer):
3891     """Aggregate constructed string (OctetString and its derivatives)
3892
3893     :param evgens: iterator of generated events
3894     :param decode_path: points to the string we want to decode
3895     :param raw: slicebable (memoryview, bytearray, etc) with
3896                 the data evgens are generated on
3897     :param writer: buffer.write where string is going to be saved
3898     :param writer: where string is going to be saved. Must comply
3899                    with ``io.RawIOBase.write`` behaviour
3900     """
3901     decode_path_len = len(decode_path)
3902     for dp, obj, _ in evgens:
3903         if dp[:decode_path_len] != decode_path:
3904             continue
3905         if not obj.ber_encoded:
3906             write_full(writer, raw[
3907                 obj.offset + obj.tlen + obj.llen:
3908                 obj.offset + obj.tlen + obj.llen + obj.vlen -
3909                 (EOC_LEN if obj.expl_lenindef else 0)
3910             ])
3911         if len(dp) == decode_path_len:
3912             break
3913
3914
3915 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
3916
3917
3918 class Null(Obj):
3919     """``NULL`` null object
3920
3921     >>> n = Null()
3922     NULL
3923     >>> n.ready
3924     True
3925     """
3926     __slots__ = ()
3927     tag_default = tag_encode(5)
3928     asn1_type_name = "NULL"
3929
3930     def __init__(
3931             self,
3932             value=None,  # unused, but Sequence passes it
3933             impl=None,
3934             expl=None,
3935             optional=False,
3936             _decoded=(0, 0, 0),
3937     ):
3938         """
3939         :param bytes impl: override default tag with ``IMPLICIT`` one
3940         :param bytes expl: override default tag with ``EXPLICIT`` one
3941         :param bool optional: is object ``OPTIONAL`` in sequence
3942         """
3943         super(Null, self).__init__(impl, expl, None, optional, _decoded)
3944         self.default = None
3945
3946     @property
3947     def ready(self):
3948         return True
3949
3950     def __getstate__(self):
3951         return NullState(
3952             __version__,
3953             self.tag,
3954             self._tag_order,
3955             self._expl,
3956             self.default,
3957             self.optional,
3958             self.offset,
3959             self.llen,
3960             self.vlen,
3961             self.expl_lenindef,
3962             self.lenindef,
3963             self.ber_encoded,
3964         )
3965
3966     def __eq__(self, their):
3967         if not issubclass(their.__class__, Null):
3968             return False
3969         return (
3970             self.tag == their.tag and
3971             self._expl == their._expl
3972         )
3973
3974     def __call__(
3975             self,
3976             value=None,
3977             impl=None,
3978             expl=None,
3979             optional=None,
3980     ):
3981         return self.__class__(
3982             impl=self.tag if impl is None else impl,
3983             expl=self._expl if expl is None else expl,
3984             optional=self.optional if optional is None else optional,
3985         )
3986
3987     def _encode(self):
3988         return self.tag + len_encode(0)
3989
3990     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3991         try:
3992             t, _, lv = tag_strip(tlv)
3993         except DecodeError as err:
3994             raise err.__class__(
3995                 msg=err.msg,
3996                 klass=self.__class__,
3997                 decode_path=decode_path,
3998                 offset=offset,
3999             )
4000         if t != self.tag:
4001             raise TagMismatch(
4002                 klass=self.__class__,
4003                 decode_path=decode_path,
4004                 offset=offset,
4005             )
4006         if tag_only:  # pragma: no cover
4007             yield None
4008             return
4009         try:
4010             l, _, v = len_decode(lv)
4011         except DecodeError as err:
4012             raise err.__class__(
4013                 msg=err.msg,
4014                 klass=self.__class__,
4015                 decode_path=decode_path,
4016                 offset=offset,
4017             )
4018         if l != 0:
4019             raise InvalidLength(
4020                 "Null must have zero length",
4021                 klass=self.__class__,
4022                 decode_path=decode_path,
4023                 offset=offset,
4024             )
4025         obj = self.__class__(
4026             impl=self.tag,
4027             expl=self._expl,
4028             optional=self.optional,
4029             _decoded=(offset, 1, 0),
4030         )
4031         yield decode_path, obj, v
4032
4033     def __repr__(self):
4034         return pp_console_row(next(self.pps()))
4035
4036     def pps(self, decode_path=()):
4037         yield _pp(
4038             obj=self,
4039             asn1_type_name=self.asn1_type_name,
4040             obj_name=self.__class__.__name__,
4041             decode_path=decode_path,
4042             optional=self.optional,
4043             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4044             expl=None if self._expl is None else tag_decode(self._expl),
4045             offset=self.offset,
4046             tlen=self.tlen,
4047             llen=self.llen,
4048             vlen=self.vlen,
4049             expl_offset=self.expl_offset if self.expled else None,
4050             expl_tlen=self.expl_tlen if self.expled else None,
4051             expl_llen=self.expl_llen if self.expled else None,
4052             expl_vlen=self.expl_vlen if self.expled else None,
4053             expl_lenindef=self.expl_lenindef,
4054             bered=self.bered,
4055         )
4056         for pp in self.pps_lenindef(decode_path):
4057             yield pp
4058
4059
4060 ObjectIdentifierState = namedtuple(
4061     "ObjectIdentifierState",
4062     BasicState._fields + ("value", "defines"),
4063     **NAMEDTUPLE_KWARGS
4064 )
4065
4066
4067 class ObjectIdentifier(Obj):
4068     """``OBJECT IDENTIFIER`` OID type
4069
4070     >>> oid = ObjectIdentifier((1, 2, 3))
4071     OBJECT IDENTIFIER 1.2.3
4072     >>> oid == ObjectIdentifier("1.2.3")
4073     True
4074     >>> tuple(oid)
4075     (1, 2, 3)
4076     >>> str(oid)
4077     '1.2.3'
4078     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4079     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4080
4081     >>> str(ObjectIdentifier((3, 1)))
4082     Traceback (most recent call last):
4083     pyderasn.InvalidOID: unacceptable first arc value
4084     """
4085     __slots__ = ("defines",)
4086     tag_default = tag_encode(6)
4087     asn1_type_name = "OBJECT IDENTIFIER"
4088
4089     def __init__(
4090             self,
4091             value=None,
4092             defines=(),
4093             impl=None,
4094             expl=None,
4095             default=None,
4096             optional=False,
4097             _decoded=(0, 0, 0),
4098     ):
4099         """
4100         :param value: set the value. Either tuples of integers,
4101                       string of "."-concatenated integers, or
4102                       :py:class:`pyderasn.ObjectIdentifier` object
4103         :param defines: sequence of tuples. Each tuple has two elements.
4104                         First one is relative to current one decode
4105                         path, aiming to the field defined by that OID.
4106                         Read about relative path in
4107                         :py:func:`pyderasn.abs_decode_path`. Second
4108                         tuple element is ``{OID: pyderasn.Obj()}``
4109                         dictionary, mapping between current OID value
4110                         and structure applied to defined field.
4111                         :ref:`Read about DEFINED BY <definedby>`
4112         :param bytes impl: override default tag with ``IMPLICIT`` one
4113         :param bytes expl: override default tag with ``EXPLICIT`` one
4114         :param default: set default value. Type same as in ``value``
4115         :param bool optional: is object ``OPTIONAL`` in sequence
4116         """
4117         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4118         self._value = value
4119         if value is not None:
4120             self._value = self._value_sanitize(value)
4121         if default is not None:
4122             default = self._value_sanitize(default)
4123             self.default = self.__class__(
4124                 value=default,
4125                 impl=self.tag,
4126                 expl=self._expl,
4127             )
4128             if self._value is None:
4129                 self._value = default
4130         self.defines = defines
4131
4132     def __add__(self, their):
4133         if their.__class__ == tuple:
4134             return self.__class__(self._value + array("L", their))
4135         if isinstance(their, self.__class__):
4136             return self.__class__(self._value + their._value)
4137         raise InvalidValueType((self.__class__, tuple))
4138
4139     def _value_sanitize(self, value):
4140         if issubclass(value.__class__, ObjectIdentifier):
4141             return value._value
4142         if isinstance(value, string_types):
4143             try:
4144                 value = array("L", (pureint(arc) for arc in value.split(".")))
4145             except ValueError:
4146                 raise InvalidOID("unacceptable arcs values")
4147         if value.__class__ == tuple:
4148             try:
4149                 value = array("L", value)
4150             except OverflowError as err:
4151                 raise InvalidOID(repr(err))
4152         if value.__class__ is array:
4153             if len(value) < 2:
4154                 raise InvalidOID("less than 2 arcs")
4155             first_arc = value[0]
4156             if first_arc in (0, 1):
4157                 if not (0 <= value[1] <= 39):
4158                     raise InvalidOID("second arc is too wide")
4159             elif first_arc == 2:
4160                 pass
4161             else:
4162                 raise InvalidOID("unacceptable first arc value")
4163             if not all(arc >= 0 for arc in value):
4164                 raise InvalidOID("negative arc value")
4165             return value
4166         raise InvalidValueType((self.__class__, str, tuple))
4167
4168     @property
4169     def ready(self):
4170         return self._value is not None
4171
4172     def __getstate__(self):
4173         return ObjectIdentifierState(
4174             __version__,
4175             self.tag,
4176             self._tag_order,
4177             self._expl,
4178             self.default,
4179             self.optional,
4180             self.offset,
4181             self.llen,
4182             self.vlen,
4183             self.expl_lenindef,
4184             self.lenindef,
4185             self.ber_encoded,
4186             self._value,
4187             self.defines,
4188         )
4189
4190     def __setstate__(self, state):
4191         super(ObjectIdentifier, self).__setstate__(state)
4192         self._value = state.value
4193         self.defines = state.defines
4194
4195     def __iter__(self):
4196         self._assert_ready()
4197         return iter(self._value)
4198
4199     def __str__(self):
4200         return ".".join(str(arc) for arc in self._value or ())
4201
4202     def __hash__(self):
4203         self._assert_ready()
4204         return hash(
4205             self.tag +
4206             bytes(self._expl or b"") +
4207             str(self._value).encode("ascii"),
4208         )
4209
4210     def __eq__(self, their):
4211         if their.__class__ == tuple:
4212             return self._value == array("L", their)
4213         if not issubclass(their.__class__, ObjectIdentifier):
4214             return False
4215         return (
4216             self.tag == their.tag and
4217             self._expl == their._expl and
4218             self._value == their._value
4219         )
4220
4221     def __lt__(self, their):
4222         return self._value < their._value
4223
4224     def __call__(
4225             self,
4226             value=None,
4227             defines=None,
4228             impl=None,
4229             expl=None,
4230             default=None,
4231             optional=None,
4232     ):
4233         return self.__class__(
4234             value=value,
4235             defines=self.defines if defines is None else defines,
4236             impl=self.tag if impl is None else impl,
4237             expl=self._expl if expl is None else expl,
4238             default=self.default if default is None else default,
4239             optional=self.optional if optional is None else optional,
4240         )
4241
4242     def _encode(self):
4243         self._assert_ready()
4244         value = self._value
4245         first_value = value[1]
4246         first_arc = value[0]
4247         if first_arc == 0:
4248             pass
4249         elif first_arc == 1:
4250             first_value += 40
4251         elif first_arc == 2:
4252             first_value += 80
4253         else:  # pragma: no cover
4254             raise RuntimeError("invalid arc is stored")
4255         octets = [zero_ended_encode(first_value)]
4256         for arc in value[2:]:
4257             octets.append(zero_ended_encode(arc))
4258         v = b"".join(octets)
4259         return b"".join((self.tag, len_encode(len(v)), v))
4260
4261     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4262         try:
4263             t, _, lv = tag_strip(tlv)
4264         except DecodeError as err:
4265             raise err.__class__(
4266                 msg=err.msg,
4267                 klass=self.__class__,
4268                 decode_path=decode_path,
4269                 offset=offset,
4270             )
4271         if t != self.tag:
4272             raise TagMismatch(
4273                 klass=self.__class__,
4274                 decode_path=decode_path,
4275                 offset=offset,
4276             )
4277         if tag_only:  # pragma: no cover
4278             yield None
4279             return
4280         try:
4281             l, llen, v = len_decode(lv)
4282         except DecodeError as err:
4283             raise err.__class__(
4284                 msg=err.msg,
4285                 klass=self.__class__,
4286                 decode_path=decode_path,
4287                 offset=offset,
4288             )
4289         if l > len(v):
4290             raise NotEnoughData(
4291                 "encoded length is longer than data",
4292                 klass=self.__class__,
4293                 decode_path=decode_path,
4294                 offset=offset,
4295             )
4296         if l == 0:
4297             raise NotEnoughData(
4298                 "zero length",
4299                 klass=self.__class__,
4300                 decode_path=decode_path,
4301                 offset=offset,
4302             )
4303         v, tail = v[:l], v[l:]
4304         arcs = array("L")
4305         ber_encoded = False
4306         while len(v) > 0:
4307             i = 0
4308             arc = 0
4309             while True:
4310                 octet = indexbytes(v, i)
4311                 if i == 0 and octet == 0x80:
4312                     if ctx.get("bered", False):
4313                         ber_encoded = True
4314                     else:
4315                         raise DecodeError(
4316                             "non normalized arc encoding",
4317                             klass=self.__class__,
4318                             decode_path=decode_path,
4319                             offset=offset,
4320                         )
4321                 arc = (arc << 7) | (octet & 0x7F)
4322                 if octet & 0x80 == 0:
4323                     try:
4324                         arcs.append(arc)
4325                     except OverflowError:
4326                         raise DecodeError(
4327                             "too huge value for local unsigned long",
4328                             klass=self.__class__,
4329                             decode_path=decode_path,
4330                             offset=offset,
4331                         )
4332                     v = v[i + 1:]
4333                     break
4334                 i += 1
4335                 if i == len(v):
4336                     raise DecodeError(
4337                         "unfinished OID",
4338                         klass=self.__class__,
4339                         decode_path=decode_path,
4340                         offset=offset,
4341                     )
4342         first_arc = 0
4343         second_arc = arcs[0]
4344         if 0 <= second_arc <= 39:
4345             first_arc = 0
4346         elif 40 <= second_arc <= 79:
4347             first_arc = 1
4348             second_arc -= 40
4349         else:
4350             first_arc = 2
4351             second_arc -= 80
4352         obj = self.__class__(
4353             value=array("L", (first_arc, second_arc)) + arcs[1:],
4354             impl=self.tag,
4355             expl=self._expl,
4356             default=self.default,
4357             optional=self.optional,
4358             _decoded=(offset, llen, l),
4359         )
4360         if ber_encoded:
4361             obj.ber_encoded = True
4362         yield decode_path, obj, tail
4363
4364     def __repr__(self):
4365         return pp_console_row(next(self.pps()))
4366
4367     def pps(self, decode_path=()):
4368         yield _pp(
4369             obj=self,
4370             asn1_type_name=self.asn1_type_name,
4371             obj_name=self.__class__.__name__,
4372             decode_path=decode_path,
4373             value=str(self) if self.ready else None,
4374             optional=self.optional,
4375             default=self == self.default,
4376             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4377             expl=None if self._expl is None else tag_decode(self._expl),
4378             offset=self.offset,
4379             tlen=self.tlen,
4380             llen=self.llen,
4381             vlen=self.vlen,
4382             expl_offset=self.expl_offset if self.expled else None,
4383             expl_tlen=self.expl_tlen if self.expled else None,
4384             expl_llen=self.expl_llen if self.expled else None,
4385             expl_vlen=self.expl_vlen if self.expled else None,
4386             expl_lenindef=self.expl_lenindef,
4387             ber_encoded=self.ber_encoded,
4388             bered=self.bered,
4389         )
4390         for pp in self.pps_lenindef(decode_path):
4391             yield pp
4392
4393
4394 class Enumerated(Integer):
4395     """``ENUMERATED`` integer type
4396
4397     This type is identical to :py:class:`pyderasn.Integer`, but requires
4398     schema to be specified and does not accept values missing from it.
4399     """
4400     __slots__ = ()
4401     tag_default = tag_encode(10)
4402     asn1_type_name = "ENUMERATED"
4403
4404     def __init__(
4405             self,
4406             value=None,
4407             impl=None,
4408             expl=None,
4409             default=None,
4410             optional=False,
4411             _specs=None,
4412             _decoded=(0, 0, 0),
4413             bounds=None,  # dummy argument, workability for Integer.decode
4414     ):
4415         super(Enumerated, self).__init__(
4416             value, bounds, impl, expl, default, optional, _specs, _decoded,
4417         )
4418         if len(self.specs) == 0:
4419             raise ValueError("schema must be specified")
4420
4421     def _value_sanitize(self, value):
4422         if isinstance(value, self.__class__):
4423             value = value._value
4424         elif isinstance(value, integer_types):
4425             for _value in itervalues(self.specs):
4426                 if _value == value:
4427                     break
4428             else:
4429                 raise DecodeError(
4430                     "unknown integer value: %s" % value,
4431                     klass=self.__class__,
4432                 )
4433         elif isinstance(value, string_types):
4434             value = self.specs.get(value)
4435             if value is None:
4436                 raise ObjUnknown("integer value: %s" % value)
4437         else:
4438             raise InvalidValueType((self.__class__, int, str))
4439         return value
4440
4441     def __call__(
4442             self,
4443             value=None,
4444             impl=None,
4445             expl=None,
4446             default=None,
4447             optional=None,
4448             _specs=None,
4449     ):
4450         return self.__class__(
4451             value=value,
4452             impl=self.tag if impl is None else impl,
4453             expl=self._expl if expl is None else expl,
4454             default=self.default if default is None else default,
4455             optional=self.optional if optional is None else optional,
4456             _specs=self.specs,
4457         )
4458
4459
4460 def escape_control_unicode(c):
4461     if unicat(c)[0] == "C":
4462         c = repr(c).lstrip("u").strip("'")
4463     return c
4464
4465
4466 class CommonString(OctetString):
4467     """Common class for all strings
4468
4469     Everything resembles :py:class:`pyderasn.OctetString`, except
4470     ability to deal with unicode text strings.
4471
4472     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4473     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4474     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4475     True
4476     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4477     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4478     >>> str(s)
4479     'привет Ð¼Ð¸Ñ€'
4480     >>> hexenc(bytes(s))
4481     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4482
4483     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4484     Traceback (most recent call last):
4485     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4486
4487     >>> BMPString("ада", bounds=(2, 2))
4488     Traceback (most recent call last):
4489     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4490     >>> s = BMPString("ад", bounds=(2, 2))
4491     >>> s.encoding
4492     'utf-16-be'
4493     >>> hexenc(bytes(s))
4494     '04300434'
4495
4496     .. list-table::
4497        :header-rows: 1
4498
4499        * - Class
4500          - Text Encoding
4501        * - :py:class:`pyderasn.UTF8String`
4502          - utf-8
4503        * - :py:class:`pyderasn.NumericString`
4504          - ascii
4505        * - :py:class:`pyderasn.PrintableString`
4506          - ascii
4507        * - :py:class:`pyderasn.TeletexString`
4508          - ascii
4509        * - :py:class:`pyderasn.T61String`
4510          - ascii
4511        * - :py:class:`pyderasn.VideotexString`
4512          - iso-8859-1
4513        * - :py:class:`pyderasn.IA5String`
4514          - ascii
4515        * - :py:class:`pyderasn.GraphicString`
4516          - iso-8859-1
4517        * - :py:class:`pyderasn.VisibleString`
4518          - ascii
4519        * - :py:class:`pyderasn.ISO646String`
4520          - ascii
4521        * - :py:class:`pyderasn.GeneralString`
4522          - iso-8859-1
4523        * - :py:class:`pyderasn.UniversalString`
4524          - utf-32-be
4525        * - :py:class:`pyderasn.BMPString`
4526          - utf-16-be
4527     """
4528     __slots__ = ()
4529
4530     def _value_sanitize(self, value):
4531         value_raw = None
4532         value_decoded = None
4533         if isinstance(value, self.__class__):
4534             value_raw = value._value
4535         elif value.__class__ == text_type:
4536             value_decoded = value
4537         elif value.__class__ == binary_type:
4538             value_raw = value
4539         else:
4540             raise InvalidValueType((self.__class__, text_type, binary_type))
4541         try:
4542             value_raw = (
4543                 value_decoded.encode(self.encoding)
4544                 if value_raw is None else value_raw
4545             )
4546             value_decoded = (
4547                 value_raw.decode(self.encoding)
4548                 if value_decoded is None else value_decoded
4549             )
4550         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4551             raise DecodeError(str(err))
4552         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4553             raise BoundsError(
4554                 self._bound_min,
4555                 len(value_decoded),
4556                 self._bound_max,
4557             )
4558         return value_raw
4559
4560     def __eq__(self, their):
4561         if their.__class__ == binary_type:
4562             return self._value == their
4563         if their.__class__ == text_type:
4564             return self._value == their.encode(self.encoding)
4565         if not isinstance(their, self.__class__):
4566             return False
4567         return (
4568             self._value == their._value and
4569             self.tag == their.tag and
4570             self._expl == their._expl
4571         )
4572
4573     def __unicode__(self):
4574         if self.ready:
4575             return self._value.decode(self.encoding)
4576         return text_type(self._value)
4577
4578     def __repr__(self):
4579         return pp_console_row(next(self.pps(no_unicode=PY2)))
4580
4581     def pps(self, decode_path=(), no_unicode=False):
4582         value = None
4583         if self.ready:
4584             value = (
4585                 hexenc(bytes(self)) if no_unicode else
4586                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4587             )
4588         yield _pp(
4589             obj=self,
4590             asn1_type_name=self.asn1_type_name,
4591             obj_name=self.__class__.__name__,
4592             decode_path=decode_path,
4593             value=value,
4594             optional=self.optional,
4595             default=self == self.default,
4596             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4597             expl=None if self._expl is None else tag_decode(self._expl),
4598             offset=self.offset,
4599             tlen=self.tlen,
4600             llen=self.llen,
4601             vlen=self.vlen,
4602             expl_offset=self.expl_offset if self.expled else None,
4603             expl_tlen=self.expl_tlen if self.expled else None,
4604             expl_llen=self.expl_llen if self.expled else None,
4605             expl_vlen=self.expl_vlen if self.expled else None,
4606             expl_lenindef=self.expl_lenindef,
4607             ber_encoded=self.ber_encoded,
4608             bered=self.bered,
4609         )
4610         for pp in self.pps_lenindef(decode_path):
4611             yield pp
4612
4613
4614 class UTF8String(CommonString):
4615     __slots__ = ()
4616     tag_default = tag_encode(12)
4617     encoding = "utf-8"
4618     asn1_type_name = "UTF8String"
4619
4620
4621 class AllowableCharsMixin(object):
4622     @property
4623     def allowable_chars(self):
4624         if PY2:
4625             return self._allowable_chars
4626         return frozenset(six_unichr(c) for c in self._allowable_chars)
4627
4628
4629 class NumericString(AllowableCharsMixin, CommonString):
4630     """Numeric string
4631
4632     Its value is properly sanitized: only ASCII digits with spaces can
4633     be stored.
4634
4635     >>> NumericString().allowable_chars
4636     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4637     """
4638     __slots__ = ()
4639     tag_default = tag_encode(18)
4640     encoding = "ascii"
4641     asn1_type_name = "NumericString"
4642     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4643
4644     def _value_sanitize(self, value):
4645         value = super(NumericString, self)._value_sanitize(value)
4646         if not frozenset(value) <= self._allowable_chars:
4647             raise DecodeError("non-numeric value")
4648         return value
4649
4650
4651 PrintableStringState = namedtuple(
4652     "PrintableStringState",
4653     OctetStringState._fields + ("allowable_chars",),
4654     **NAMEDTUPLE_KWARGS
4655 )
4656
4657
4658 class PrintableString(AllowableCharsMixin, CommonString):
4659     """Printable string
4660
4661     Its value is properly sanitized: see X.680 41.4 table 10.
4662
4663     >>> PrintableString().allowable_chars
4664     frozenset([' ', "'", ..., 'z'])
4665     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4666     PrintableString PrintableString foo*bar
4667     >>> obj.allow_asterisk, obj.allow_ampersand
4668     (True, False)
4669     """
4670     __slots__ = ()
4671     tag_default = tag_encode(19)
4672     encoding = "ascii"
4673     asn1_type_name = "PrintableString"
4674     _allowable_chars = frozenset(
4675         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4676     )
4677     _asterisk = frozenset("*".encode("ascii"))
4678     _ampersand = frozenset("&".encode("ascii"))
4679
4680     def __init__(
4681             self,
4682             value=None,
4683             bounds=None,
4684             impl=None,
4685             expl=None,
4686             default=None,
4687             optional=False,
4688             _decoded=(0, 0, 0),
4689             ctx=None,
4690             allow_asterisk=False,
4691             allow_ampersand=False,
4692     ):
4693         """
4694         :param allow_asterisk: allow asterisk character
4695         :param allow_ampersand: allow ampersand character
4696         """
4697         if allow_asterisk:
4698             self._allowable_chars |= self._asterisk
4699         if allow_ampersand:
4700             self._allowable_chars |= self._ampersand
4701         super(PrintableString, self).__init__(
4702             value, bounds, impl, expl, default, optional, _decoded, ctx,
4703         )
4704
4705     @property
4706     def allow_asterisk(self):
4707         """Is asterisk character allowed?
4708         """
4709         return self._asterisk <= self._allowable_chars
4710
4711     @property
4712     def allow_ampersand(self):
4713         """Is ampersand character allowed?
4714         """
4715         return self._ampersand <= self._allowable_chars
4716
4717     def _value_sanitize(self, value):
4718         value = super(PrintableString, self)._value_sanitize(value)
4719         if not frozenset(value) <= self._allowable_chars:
4720             raise DecodeError("non-printable value")
4721         return value
4722
4723     def __getstate__(self):
4724         return PrintableStringState(
4725             *super(PrintableString, self).__getstate__(),
4726             **{"allowable_chars": self._allowable_chars}
4727         )
4728
4729     def __setstate__(self, state):
4730         super(PrintableString, self).__setstate__(state)
4731         self._allowable_chars = state.allowable_chars
4732
4733     def __call__(
4734             self,
4735             value=None,
4736             bounds=None,
4737             impl=None,
4738             expl=None,
4739             default=None,
4740             optional=None,
4741     ):
4742         return self.__class__(
4743             value=value,
4744             bounds=(
4745                 (self._bound_min, self._bound_max)
4746                 if bounds is None else bounds
4747             ),
4748             impl=self.tag if impl is None else impl,
4749             expl=self._expl if expl is None else expl,
4750             default=self.default if default is None else default,
4751             optional=self.optional if optional is None else optional,
4752             allow_asterisk=self.allow_asterisk,
4753             allow_ampersand=self.allow_ampersand,
4754         )
4755
4756
4757 class TeletexString(CommonString):
4758     __slots__ = ()
4759     tag_default = tag_encode(20)
4760     encoding = "ascii"
4761     asn1_type_name = "TeletexString"
4762
4763
4764 class T61String(TeletexString):
4765     __slots__ = ()
4766     asn1_type_name = "T61String"
4767
4768
4769 class VideotexString(CommonString):
4770     __slots__ = ()
4771     tag_default = tag_encode(21)
4772     encoding = "iso-8859-1"
4773     asn1_type_name = "VideotexString"
4774
4775
4776 class IA5String(CommonString):
4777     __slots__ = ()
4778     tag_default = tag_encode(22)
4779     encoding = "ascii"
4780     asn1_type_name = "IA5"
4781
4782
4783 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4784 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4785 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4786
4787
4788 class VisibleString(CommonString):
4789     __slots__ = ()
4790     tag_default = tag_encode(26)
4791     encoding = "ascii"
4792     asn1_type_name = "VisibleString"
4793
4794
4795 UTCTimeState = namedtuple(
4796     "UTCTimeState",
4797     OctetStringState._fields + ("ber_raw",),
4798     **NAMEDTUPLE_KWARGS
4799 )
4800
4801
4802 def str_to_time_fractions(value):
4803     v = pureint(value)
4804     year, v = (v // 10**10), (v % 10**10)
4805     month, v = (v // 10**8), (v % 10**8)
4806     day, v = (v // 10**6), (v % 10**6)
4807     hour, v = (v // 10**4), (v % 10**4)
4808     minute, second = (v // 100), (v % 100)
4809     return year, month, day, hour, minute, second
4810
4811
4812 class UTCTime(VisibleString):
4813     """``UTCTime`` datetime type
4814
4815     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
4816     UTCTime UTCTime 2017-09-30T22:07:50
4817     >>> str(t)
4818     '170930220750Z'
4819     >>> bytes(t)
4820     b'170930220750Z'
4821     >>> t.todatetime()
4822     datetime.datetime(2017, 9, 30, 22, 7, 50)
4823     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
4824     datetime.datetime(1957, 9, 30, 22, 7, 50)
4825
4826     If BER encoded value was met, then ``ber_raw`` attribute will hold
4827     its raw representation.
4828
4829     .. warning::
4830
4831        Pay attention that UTCTime can not hold full year, so all years
4832        having < 50 years are treated as 20xx, 19xx otherwise, according
4833        to X.509 recommendation.
4834
4835     .. warning::
4836
4837        No strict validation of UTC offsets are made, but very crude:
4838
4839        * minutes are not exceeding 60
4840        * offset value is not exceeding 14 hours
4841     """
4842     __slots__ = ("ber_raw",)
4843     tag_default = tag_encode(23)
4844     encoding = "ascii"
4845     asn1_type_name = "UTCTime"
4846     evgen_mode_skip_value = False
4847
4848     def __init__(
4849             self,
4850             value=None,
4851             impl=None,
4852             expl=None,
4853             default=None,
4854             optional=False,
4855             _decoded=(0, 0, 0),
4856             bounds=None,  # dummy argument, workability for OctetString.decode
4857             ctx=None,
4858     ):
4859         """
4860         :param value: set the value. Either datetime type, or
4861                       :py:class:`pyderasn.UTCTime` object
4862         :param bytes impl: override default tag with ``IMPLICIT`` one
4863         :param bytes expl: override default tag with ``EXPLICIT`` one
4864         :param default: set default value. Type same as in ``value``
4865         :param bool optional: is object ``OPTIONAL`` in sequence
4866         """
4867         super(UTCTime, self).__init__(
4868             None, None, impl, expl, None, optional, _decoded, ctx,
4869         )
4870         self._value = value
4871         self.ber_raw = None
4872         if value is not None:
4873             self._value, self.ber_raw = self._value_sanitize(value, ctx)
4874             self.ber_encoded = self.ber_raw is not None
4875         if default is not None:
4876             default, _ = self._value_sanitize(default)
4877             self.default = self.__class__(
4878                 value=default,
4879                 impl=self.tag,
4880                 expl=self._expl,
4881             )
4882             if self._value is None:
4883                 self._value = default
4884             optional = True
4885         self.optional = optional
4886
4887     def _strptime_bered(self, value):
4888         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
4889         value = value[10:]
4890         if len(value) == 0:
4891             raise ValueError("no timezone")
4892         year += 2000 if year < 50 else 1900
4893         decoded = datetime(year, month, day, hour, minute)
4894         offset = 0
4895         if value[-1] == "Z":
4896             value = value[:-1]
4897         else:
4898             if len(value) < 5:
4899                 raise ValueError("invalid UTC offset")
4900             if value[-5] == "-":
4901                 sign = -1
4902             elif value[-5] == "+":
4903                 sign = 1
4904             else:
4905                 raise ValueError("invalid UTC offset")
4906             v = pureint(value[-4:])
4907             offset, v = (60 * (v % 100)), v // 100
4908             if offset >= 3600:
4909                 raise ValueError("invalid UTC offset minutes")
4910             offset += 3600 * v
4911             if offset > 14 * 3600:
4912                 raise ValueError("too big UTC offset")
4913             offset *= sign
4914             value = value[:-5]
4915         if len(value) == 0:
4916             return offset, decoded
4917         if len(value) != 2:
4918             raise ValueError("invalid UTC offset seconds")
4919         seconds = pureint(value)
4920         if seconds >= 60:
4921             raise ValueError("invalid seconds value")
4922         return offset, decoded + timedelta(seconds=seconds)
4923
4924     def _strptime(self, value):
4925         # datetime.strptime's format: %y%m%d%H%M%SZ
4926         if len(value) != LEN_YYMMDDHHMMSSZ:
4927             raise ValueError("invalid UTCTime length")
4928         if value[-1] != "Z":
4929             raise ValueError("non UTC timezone")
4930         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
4931         year += 2000 if year < 50 else 1900
4932         return datetime(year, month, day, hour, minute, second)
4933
4934     def _dt_sanitize(self, value):
4935         if value.year < 1950 or value.year > 2049:
4936             raise ValueError("UTCTime can hold only 1950-2049 years")
4937         return value.replace(microsecond=0)
4938
4939     def _value_sanitize(self, value, ctx=None):
4940         if value.__class__ == binary_type:
4941             try:
4942                 value_decoded = value.decode("ascii")
4943             except (UnicodeEncodeError, UnicodeDecodeError) as err:
4944                 raise DecodeError("invalid UTCTime encoding: %r" % err)
4945             err = None
4946             try:
4947                 return self._strptime(value_decoded), None
4948             except (TypeError, ValueError) as _err:
4949                 err = _err
4950                 if (ctx is not None) and ctx.get("bered", False):
4951                     try:
4952                         offset, _value = self._strptime_bered(value_decoded)
4953                         _value = _value - timedelta(seconds=offset)
4954                         return self._dt_sanitize(_value), value
4955                     except (TypeError, ValueError, OverflowError) as _err:
4956                         err = _err
4957             raise DecodeError(
4958                 "invalid %s format: %r" % (self.asn1_type_name, err),
4959                 klass=self.__class__,
4960             )
4961         if isinstance(value, self.__class__):
4962             return value._value, None
4963         if value.__class__ == datetime:
4964             return self._dt_sanitize(value), None
4965         raise InvalidValueType((self.__class__, datetime))
4966
4967     def _pp_value(self):
4968         if self.ready:
4969             value = self._value.isoformat()
4970             if self.ber_encoded:
4971                 value += " (%s)" % self.ber_raw
4972             return value
4973
4974     def __unicode__(self):
4975         if self.ready:
4976             value = self._value.isoformat()
4977             if self.ber_encoded:
4978                 value += " (%s)" % self.ber_raw
4979             return value
4980         return text_type(self._pp_value())
4981
4982     def __getstate__(self):
4983         return UTCTimeState(
4984             *super(UTCTime, self).__getstate__(),
4985             **{"ber_raw": self.ber_raw}
4986         )
4987
4988     def __setstate__(self, state):
4989         super(UTCTime, self).__setstate__(state)
4990         self.ber_raw = state.ber_raw
4991
4992     def __bytes__(self):
4993         self._assert_ready()
4994         return self._encode_time()
4995
4996     def __eq__(self, their):
4997         if their.__class__ == binary_type:
4998             return self._encode_time() == their
4999         if their.__class__ == datetime:
5000             return self.todatetime() == their
5001         if not isinstance(their, self.__class__):
5002             return False
5003         return (
5004             self._value == their._value and
5005             self.tag == their.tag and
5006             self._expl == their._expl
5007         )
5008
5009     def _encode_time(self):
5010         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5011
5012     def _encode(self):
5013         self._assert_ready()
5014         value = self._encode_time()
5015         return b"".join((self.tag, len_encode(len(value)), value))
5016
5017     def _encode_cer(self, writer):
5018         write_full(writer, self._encode())
5019
5020     def todatetime(self):
5021         return self._value
5022
5023     def __repr__(self):
5024         return pp_console_row(next(self.pps()))
5025
5026     def pps(self, decode_path=()):
5027         yield _pp(
5028             obj=self,
5029             asn1_type_name=self.asn1_type_name,
5030             obj_name=self.__class__.__name__,
5031             decode_path=decode_path,
5032             value=self._pp_value(),
5033             optional=self.optional,
5034             default=self == self.default,
5035             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5036             expl=None if self._expl is None else tag_decode(self._expl),
5037             offset=self.offset,
5038             tlen=self.tlen,
5039             llen=self.llen,
5040             vlen=self.vlen,
5041             expl_offset=self.expl_offset if self.expled else None,
5042             expl_tlen=self.expl_tlen if self.expled else None,
5043             expl_llen=self.expl_llen if self.expled else None,
5044             expl_vlen=self.expl_vlen if self.expled else None,
5045             expl_lenindef=self.expl_lenindef,
5046             ber_encoded=self.ber_encoded,
5047             bered=self.bered,
5048         )
5049         for pp in self.pps_lenindef(decode_path):
5050             yield pp
5051
5052
5053 class GeneralizedTime(UTCTime):
5054     """``GeneralizedTime`` datetime type
5055
5056     This type is similar to :py:class:`pyderasn.UTCTime`.
5057
5058     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5059     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5060     >>> str(t)
5061     '20170930220750.000123Z'
5062     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5063     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5064
5065     .. warning::
5066
5067        Only microsecond fractions are supported in DER encoding.
5068        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5069        higher precision values.
5070
5071     .. warning::
5072
5073        BER encoded data can loss information (accuracy) during decoding
5074        because of float transformations.
5075
5076     .. warning::
5077
5078        Local times (without explicit timezone specification) are treated
5079        as UTC one, no transformations are made.
5080
5081     .. warning::
5082
5083        Zero year is unsupported.
5084     """
5085     __slots__ = ()
5086     tag_default = tag_encode(24)
5087     asn1_type_name = "GeneralizedTime"
5088
5089     def _dt_sanitize(self, value):
5090         return value
5091
5092     def _strptime_bered(self, value):
5093         if len(value) < 4 + 3 * 2:
5094             raise ValueError("invalid GeneralizedTime")
5095         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5096         decoded = datetime(year, month, day, hour)
5097         offset, value = 0, value[10:]
5098         if len(value) == 0:
5099             return offset, decoded
5100         if value[-1] == "Z":
5101             value = value[:-1]
5102         else:
5103             for char, sign in (("-", -1), ("+", 1)):
5104                 idx = value.rfind(char)
5105                 if idx == -1:
5106                     continue
5107                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5108                 v = pureint(offset_raw)
5109                 if len(offset_raw) == 4:
5110                     offset, v = (60 * (v % 100)), v // 100
5111                     if offset >= 3600:
5112                         raise ValueError("invalid UTC offset minutes")
5113                 elif len(offset_raw) == 2:
5114                     pass
5115                 else:
5116                     raise ValueError("invalid UTC offset")
5117                 offset += 3600 * v
5118                 if offset > 14 * 3600:
5119                     raise ValueError("too big UTC offset")
5120                 offset *= sign
5121                 break
5122         if len(value) == 0:
5123             return offset, decoded
5124         if value[0] in DECIMAL_SIGNS:
5125             return offset, (
5126                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5127             )
5128         if len(value) < 2:
5129             raise ValueError("stripped minutes")
5130         decoded += timedelta(seconds=60 * pureint(value[:2]))
5131         value = value[2:]
5132         if len(value) == 0:
5133             return offset, decoded
5134         if value[0] in DECIMAL_SIGNS:
5135             return offset, (
5136                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5137             )
5138         if len(value) < 2:
5139             raise ValueError("stripped seconds")
5140         decoded += timedelta(seconds=pureint(value[:2]))
5141         value = value[2:]
5142         if len(value) == 0:
5143             return offset, decoded
5144         if value[0] not in DECIMAL_SIGNS:
5145             raise ValueError("invalid format after seconds")
5146         return offset, (
5147             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5148         )
5149
5150     def _strptime(self, value):
5151         l = len(value)
5152         if l == LEN_YYYYMMDDHHMMSSZ:
5153             # datetime.strptime's format: %Y%m%d%H%M%SZ
5154             if value[-1] != "Z":
5155                 raise ValueError("non UTC timezone")
5156             return datetime(*str_to_time_fractions(value[:-1]))
5157         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5158             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5159             if value[-1] != "Z":
5160                 raise ValueError("non UTC timezone")
5161             if value[14] != ".":
5162                 raise ValueError("no fractions separator")
5163             us = value[15:-1]
5164             if us[-1] == "0":
5165                 raise ValueError("trailing zero")
5166             us_len = len(us)
5167             if us_len > 6:
5168                 raise ValueError("only microsecond fractions are supported")
5169             us = pureint(us + ("0" * (6 - us_len)))
5170             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5171             return datetime(year, month, day, hour, minute, second, us)
5172         raise ValueError("invalid GeneralizedTime length")
5173
5174     def _encode_time(self):
5175         value = self._value
5176         encoded = value.strftime("%Y%m%d%H%M%S")
5177         if value.microsecond > 0:
5178             encoded += (".%06d" % value.microsecond).rstrip("0")
5179         return (encoded + "Z").encode("ascii")
5180
5181
5182 class GraphicString(CommonString):
5183     __slots__ = ()
5184     tag_default = tag_encode(25)
5185     encoding = "iso-8859-1"
5186     asn1_type_name = "GraphicString"
5187
5188
5189 class ISO646String(VisibleString):
5190     __slots__ = ()
5191     asn1_type_name = "ISO646String"
5192
5193
5194 class GeneralString(CommonString):
5195     __slots__ = ()
5196     tag_default = tag_encode(27)
5197     encoding = "iso-8859-1"
5198     asn1_type_name = "GeneralString"
5199
5200
5201 class UniversalString(CommonString):
5202     __slots__ = ()
5203     tag_default = tag_encode(28)
5204     encoding = "utf-32-be"
5205     asn1_type_name = "UniversalString"
5206
5207
5208 class BMPString(CommonString):
5209     __slots__ = ()
5210     tag_default = tag_encode(30)
5211     encoding = "utf-16-be"
5212     asn1_type_name = "BMPString"
5213
5214
5215 ChoiceState = namedtuple(
5216     "ChoiceState",
5217     BasicState._fields + ("specs", "value",),
5218     **NAMEDTUPLE_KWARGS
5219 )
5220
5221
5222 class Choice(Obj):
5223     """``CHOICE`` special type
5224
5225     ::
5226
5227         class GeneralName(Choice):
5228             schema = (
5229                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5230                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5231             )
5232
5233     >>> gn = GeneralName()
5234     GeneralName CHOICE
5235     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5236     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5237     >>> gn["dNSName"] = IA5String("bar.baz")
5238     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5239     >>> gn["rfc822Name"]
5240     None
5241     >>> gn["dNSName"]
5242     [2] IA5String IA5 bar.baz
5243     >>> gn.choice
5244     'dNSName'
5245     >>> gn.value == gn["dNSName"]
5246     True
5247     >>> gn.specs
5248     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5249
5250     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5251     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5252     """
5253     __slots__ = ("specs",)
5254     tag_default = None
5255     asn1_type_name = "CHOICE"
5256
5257     def __init__(
5258             self,
5259             value=None,
5260             schema=None,
5261             impl=None,
5262             expl=None,
5263             default=None,
5264             optional=False,
5265             _decoded=(0, 0, 0),
5266     ):
5267         """
5268         :param value: set the value. Either ``(choice, value)`` tuple, or
5269                       :py:class:`pyderasn.Choice` object
5270         :param bytes impl: can not be set, do **not** use it
5271         :param bytes expl: override default tag with ``EXPLICIT`` one
5272         :param default: set default value. Type same as in ``value``
5273         :param bool optional: is object ``OPTIONAL`` in sequence
5274         """
5275         if impl is not None:
5276             raise ValueError("no implicit tag allowed for CHOICE")
5277         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5278         if schema is None:
5279             schema = getattr(self, "schema", ())
5280         if len(schema) == 0:
5281             raise ValueError("schema must be specified")
5282         self.specs = (
5283             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5284         )
5285         self._value = None
5286         if value is not None:
5287             self._value = self._value_sanitize(value)
5288         if default is not None:
5289             default_value = self._value_sanitize(default)
5290             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5291             default_obj.specs = self.specs
5292             default_obj._value = default_value
5293             self.default = default_obj
5294             if value is None:
5295                 self._value = copy(default_obj._value)
5296         if self._expl is not None:
5297             tag_class, _, tag_num = tag_decode(self._expl)
5298             self._tag_order = (tag_class, tag_num)
5299
5300     def _value_sanitize(self, value):
5301         if (value.__class__ == tuple) and len(value) == 2:
5302             choice, obj = value
5303             spec = self.specs.get(choice)
5304             if spec is None:
5305                 raise ObjUnknown(choice)
5306             if not isinstance(obj, spec.__class__):
5307                 raise InvalidValueType((spec,))
5308             return (choice, spec(obj))
5309         if isinstance(value, self.__class__):
5310             return value._value
5311         raise InvalidValueType((self.__class__, tuple))
5312
5313     @property
5314     def ready(self):
5315         return self._value is not None and self._value[1].ready
5316
5317     @property
5318     def bered(self):
5319         return self.expl_lenindef or (
5320             (self._value is not None) and
5321             self._value[1].bered
5322         )
5323
5324     def __getstate__(self):
5325         return ChoiceState(
5326             __version__,
5327             self.tag,
5328             self._tag_order,
5329             self._expl,
5330             self.default,
5331             self.optional,
5332             self.offset,
5333             self.llen,
5334             self.vlen,
5335             self.expl_lenindef,
5336             self.lenindef,
5337             self.ber_encoded,
5338             self.specs,
5339             copy(self._value),
5340         )
5341
5342     def __setstate__(self, state):
5343         super(Choice, self).__setstate__(state)
5344         self.specs = state.specs
5345         self._value = state.value
5346
5347     def __eq__(self, their):
5348         if (their.__class__ == tuple) and len(their) == 2:
5349             return self._value == their
5350         if not isinstance(their, self.__class__):
5351             return False
5352         return (
5353             self.specs == their.specs and
5354             self._value == their._value
5355         )
5356
5357     def __call__(
5358             self,
5359             value=None,
5360             expl=None,
5361             default=None,
5362             optional=None,
5363     ):
5364         return self.__class__(
5365             value=value,
5366             schema=self.specs,
5367             expl=self._expl if expl is None else expl,
5368             default=self.default if default is None else default,
5369             optional=self.optional if optional is None else optional,
5370         )
5371
5372     @property
5373     def choice(self):
5374         """Name of the choice
5375         """
5376         self._assert_ready()
5377         return self._value[0]
5378
5379     @property
5380     def value(self):
5381         """Value of underlying choice
5382         """
5383         self._assert_ready()
5384         return self._value[1]
5385
5386     @property
5387     def tag_order(self):
5388         self._assert_ready()
5389         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5390
5391     @property
5392     def tag_order_cer(self):
5393         return min(v.tag_order_cer for v in itervalues(self.specs))
5394
5395     def __getitem__(self, key):
5396         if key not in self.specs:
5397             raise ObjUnknown(key)
5398         if self._value is None:
5399             return None
5400         choice, value = self._value
5401         if choice != key:
5402             return None
5403         return value
5404
5405     def __setitem__(self, key, value):
5406         spec = self.specs.get(key)
5407         if spec is None:
5408             raise ObjUnknown(key)
5409         if not isinstance(value, spec.__class__):
5410             raise InvalidValueType((spec.__class__,))
5411         self._value = (key, spec(value))
5412
5413     @property
5414     def tlen(self):
5415         return 0
5416
5417     @property
5418     def decoded(self):
5419         return self._value[1].decoded if self.ready else False
5420
5421     def _encode(self):
5422         self._assert_ready()
5423         return self._value[1].encode()
5424
5425     def _encode_cer(self, writer):
5426         self._assert_ready()
5427         self._value[1].encode_cer(writer)
5428
5429     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5430         for choice, spec in iteritems(self.specs):
5431             sub_decode_path = decode_path + (choice,)
5432             try:
5433                 spec.decode(
5434                     tlv,
5435                     offset=offset,
5436                     leavemm=True,
5437                     decode_path=sub_decode_path,
5438                     ctx=ctx,
5439                     tag_only=True,
5440                     _ctx_immutable=False,
5441                 )
5442             except TagMismatch:
5443                 continue
5444             break
5445         else:
5446             raise TagMismatch(
5447                 klass=self.__class__,
5448                 decode_path=decode_path,
5449                 offset=offset,
5450             )
5451         if tag_only:  # pragma: no cover
5452             yield None
5453             return
5454         if evgen_mode:
5455             for _decode_path, value, tail in spec.decode_evgen(
5456                     tlv,
5457                     offset=offset,
5458                     leavemm=True,
5459                     decode_path=sub_decode_path,
5460                     ctx=ctx,
5461                     _ctx_immutable=False,
5462             ):
5463                 yield _decode_path, value, tail
5464         else:
5465             _, value, tail = next(spec.decode_evgen(
5466                 tlv,
5467                 offset=offset,
5468                 leavemm=True,
5469                 decode_path=sub_decode_path,
5470                 ctx=ctx,
5471                 _ctx_immutable=False,
5472                 _evgen_mode=False,
5473             ))
5474         obj = self.__class__(
5475             schema=self.specs,
5476             expl=self._expl,
5477             default=self.default,
5478             optional=self.optional,
5479             _decoded=(offset, 0, value.fulllen),
5480         )
5481         obj._value = (choice, value)
5482         yield decode_path, obj, tail
5483
5484     def __repr__(self):
5485         value = pp_console_row(next(self.pps()))
5486         if self.ready:
5487             value = "%s[%r]" % (value, self.value)
5488         return value
5489
5490     def pps(self, decode_path=()):
5491         yield _pp(
5492             obj=self,
5493             asn1_type_name=self.asn1_type_name,
5494             obj_name=self.__class__.__name__,
5495             decode_path=decode_path,
5496             value=self.choice if self.ready else None,
5497             optional=self.optional,
5498             default=self == self.default,
5499             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5500             expl=None if self._expl is None else tag_decode(self._expl),
5501             offset=self.offset,
5502             tlen=self.tlen,
5503             llen=self.llen,
5504             vlen=self.vlen,
5505             expl_lenindef=self.expl_lenindef,
5506             bered=self.bered,
5507         )
5508         if self.ready:
5509             yield self.value.pps(decode_path=decode_path + (self.choice,))
5510         for pp in self.pps_lenindef(decode_path):
5511             yield pp
5512
5513
5514 class PrimitiveTypes(Choice):
5515     """Predefined ``CHOICE`` for all generic primitive types
5516
5517     It could be useful for general decoding of some unspecified values:
5518
5519     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5520     OCTET STRING 3 bytes 666f6f
5521     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5522     INTEGER 1193046
5523     """
5524     __slots__ = ()
5525     schema = tuple((klass.__name__, klass()) for klass in (
5526         Boolean,
5527         Integer,
5528         BitString,
5529         OctetString,
5530         Null,
5531         ObjectIdentifier,
5532         UTF8String,
5533         NumericString,
5534         PrintableString,
5535         TeletexString,
5536         VideotexString,
5537         IA5String,
5538         UTCTime,
5539         GeneralizedTime,
5540         GraphicString,
5541         VisibleString,
5542         ISO646String,
5543         GeneralString,
5544         UniversalString,
5545         BMPString,
5546     ))
5547
5548
5549 AnyState = namedtuple(
5550     "AnyState",
5551     BasicState._fields + ("value", "defined"),
5552     **NAMEDTUPLE_KWARGS
5553 )
5554
5555
5556 class Any(Obj):
5557     """``ANY`` special type
5558
5559     >>> Any(Integer(-123))
5560     ANY INTEGER -123 (0X:7B)
5561     >>> a = Any(OctetString(b"hello world").encode())
5562     ANY 040b68656c6c6f20776f726c64
5563     >>> hexenc(bytes(a))
5564     b'0x040x0bhello world'
5565     """
5566     __slots__ = ("defined",)
5567     tag_default = tag_encode(0)
5568     asn1_type_name = "ANY"
5569
5570     def __init__(
5571             self,
5572             value=None,
5573             expl=None,
5574             optional=False,
5575             _decoded=(0, 0, 0),
5576     ):
5577         """
5578         :param value: set the value. Either any kind of pyderasn's
5579                       **ready** object, or bytes. Pay attention that
5580                       **no** validation is performed if raw binary value
5581                       is valid TLV, except just tag decoding
5582         :param bytes expl: override default tag with ``EXPLICIT`` one
5583         :param bool optional: is object ``OPTIONAL`` in sequence
5584         """
5585         super(Any, self).__init__(None, expl, None, optional, _decoded)
5586         if value is None:
5587             self._value = None
5588         else:
5589             value = self._value_sanitize(value)
5590             self._value = value
5591             if self._expl is None:
5592                 if value.__class__ == binary_type:
5593                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5594                 else:
5595                     tag_class, tag_num = value.tag_order
5596             else:
5597                 tag_class, _, tag_num = tag_decode(self._expl)
5598             self._tag_order = (tag_class, tag_num)
5599         self.defined = None
5600
5601     def _value_sanitize(self, value):
5602         if value.__class__ == binary_type:
5603             if len(value) == 0:
5604                 raise ValueError("Any value can not be empty")
5605             return value
5606         if isinstance(value, self.__class__):
5607             return value._value
5608         if not isinstance(value, Obj):
5609             raise InvalidValueType((self.__class__, Obj, binary_type))
5610         return value
5611
5612     @property
5613     def ready(self):
5614         return self._value is not None
5615
5616     @property
5617     def tag_order(self):
5618         self._assert_ready()
5619         return self._tag_order
5620
5621     @property
5622     def bered(self):
5623         if self.expl_lenindef or self.lenindef:
5624             return True
5625         if self.defined is None:
5626             return False
5627         return self.defined[1].bered
5628
5629     def __getstate__(self):
5630         return AnyState(
5631             __version__,
5632             self.tag,
5633             self._tag_order,
5634             self._expl,
5635             None,
5636             self.optional,
5637             self.offset,
5638             self.llen,
5639             self.vlen,
5640             self.expl_lenindef,
5641             self.lenindef,
5642             self.ber_encoded,
5643             self._value,
5644             self.defined,
5645         )
5646
5647     def __setstate__(self, state):
5648         super(Any, self).__setstate__(state)
5649         self._value = state.value
5650         self.defined = state.defined
5651
5652     def __eq__(self, their):
5653         if their.__class__ == binary_type:
5654             if self._value.__class__ == binary_type:
5655                 return self._value == their
5656             return self._value.encode() == their
5657         if issubclass(their.__class__, Any):
5658             if self.ready and their.ready:
5659                 return bytes(self) == bytes(their)
5660             return self.ready == their.ready
5661         return False
5662
5663     def __call__(
5664             self,
5665             value=None,
5666             expl=None,
5667             optional=None,
5668     ):
5669         return self.__class__(
5670             value=value,
5671             expl=self._expl if expl is None else expl,
5672             optional=self.optional if optional is None else optional,
5673         )
5674
5675     def __bytes__(self):
5676         self._assert_ready()
5677         value = self._value
5678         if value.__class__ == binary_type:
5679             return value
5680         return self._value.encode()
5681
5682     @property
5683     def tlen(self):
5684         return 0
5685
5686     def _encode(self):
5687         self._assert_ready()
5688         value = self._value
5689         if value.__class__ == binary_type:
5690             return value
5691         return value.encode()
5692
5693     def _encode_cer(self, writer):
5694         self._assert_ready()
5695         value = self._value
5696         if value.__class__ == binary_type:
5697             write_full(writer, value)
5698         else:
5699             value.encode_cer(writer)
5700
5701     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5702         try:
5703             t, tlen, lv = tag_strip(tlv)
5704         except DecodeError as err:
5705             raise err.__class__(
5706                 msg=err.msg,
5707                 klass=self.__class__,
5708                 decode_path=decode_path,
5709                 offset=offset,
5710             )
5711         try:
5712             l, llen, v = len_decode(lv)
5713         except LenIndefForm as err:
5714             if not ctx.get("bered", False):
5715                 raise err.__class__(
5716                     msg=err.msg,
5717                     klass=self.__class__,
5718                     decode_path=decode_path,
5719                     offset=offset,
5720                 )
5721             llen, vlen, v = 1, 0, lv[1:]
5722             sub_offset = offset + tlen + llen
5723             chunk_i = 0
5724             while v[:EOC_LEN].tobytes() != EOC:
5725                 chunk, v = Any().decode(
5726                     v,
5727                     offset=sub_offset,
5728                     decode_path=decode_path + (str(chunk_i),),
5729                     leavemm=True,
5730                     ctx=ctx,
5731                     _ctx_immutable=False,
5732                 )
5733                 vlen += chunk.tlvlen
5734                 sub_offset += chunk.tlvlen
5735                 chunk_i += 1
5736             tlvlen = tlen + llen + vlen + EOC_LEN
5737             obj = self.__class__(
5738                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5739                 expl=self._expl,
5740                 optional=self.optional,
5741                 _decoded=(offset, 0, tlvlen),
5742             )
5743             obj.lenindef = True
5744             obj.tag = t.tobytes()
5745             yield decode_path, obj, v[EOC_LEN:]
5746             return
5747         except DecodeError as err:
5748             raise err.__class__(
5749                 msg=err.msg,
5750                 klass=self.__class__,
5751                 decode_path=decode_path,
5752                 offset=offset,
5753             )
5754         if l > len(v):
5755             raise NotEnoughData(
5756                 "encoded length is longer than data",
5757                 klass=self.__class__,
5758                 decode_path=decode_path,
5759                 offset=offset,
5760             )
5761         tlvlen = tlen + llen + l
5762         v, tail = tlv[:tlvlen], v[l:]
5763         obj = self.__class__(
5764             value=None if evgen_mode else v.tobytes(),
5765             expl=self._expl,
5766             optional=self.optional,
5767             _decoded=(offset, 0, tlvlen),
5768         )
5769         obj.tag = t.tobytes()
5770         yield decode_path, obj, tail
5771
5772     def __repr__(self):
5773         return pp_console_row(next(self.pps()))
5774
5775     def pps(self, decode_path=()):
5776         value = self._value
5777         if value is None:
5778             pass
5779         elif value.__class__ == binary_type:
5780             value = None
5781         else:
5782             value = repr(value)
5783         yield _pp(
5784             obj=self,
5785             asn1_type_name=self.asn1_type_name,
5786             obj_name=self.__class__.__name__,
5787             decode_path=decode_path,
5788             value=value,
5789             blob=self._value if self._value.__class__ == binary_type else None,
5790             optional=self.optional,
5791             default=self == self.default,
5792             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5793             expl=None if self._expl is None else tag_decode(self._expl),
5794             offset=self.offset,
5795             tlen=self.tlen,
5796             llen=self.llen,
5797             vlen=self.vlen,
5798             expl_offset=self.expl_offset if self.expled else None,
5799             expl_tlen=self.expl_tlen if self.expled else None,
5800             expl_llen=self.expl_llen if self.expled else None,
5801             expl_vlen=self.expl_vlen if self.expled else None,
5802             expl_lenindef=self.expl_lenindef,
5803             lenindef=self.lenindef,
5804             bered=self.bered,
5805         )
5806         defined_by, defined = self.defined or (None, None)
5807         if defined_by is not None:
5808             yield defined.pps(
5809                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
5810             )
5811         for pp in self.pps_lenindef(decode_path):
5812             yield pp
5813
5814
5815 ########################################################################
5816 # ASN.1 constructed types
5817 ########################################################################
5818
5819 def get_def_by_path(defines_by_path, sub_decode_path):
5820     """Get define by decode path
5821     """
5822     for path, define in defines_by_path:
5823         if len(path) != len(sub_decode_path):
5824             continue
5825         for p1, p2 in zip(path, sub_decode_path):
5826             if (not p1 is any) and (p1 != p2):
5827                 break
5828         else:
5829             return define
5830
5831
5832 def abs_decode_path(decode_path, rel_path):
5833     """Create an absolute decode path from current and relative ones
5834
5835     :param decode_path: current decode path, starting point. Tuple of strings
5836     :param rel_path: relative path to ``decode_path``. Tuple of strings.
5837                      If first tuple's element is "/", then treat it as
5838                      an absolute path, ignoring ``decode_path`` as
5839                      starting point. Also this tuple can contain ".."
5840                      elements, stripping the leading element from
5841                      ``decode_path``
5842
5843     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
5844     ("foo", "bar", "baz", "whatever")
5845     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
5846     ("foo", "whatever")
5847     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
5848     ("baz", "whatever")
5849     """
5850     if rel_path[0] == "/":
5851         return rel_path[1:]
5852     if rel_path[0] == "..":
5853         return abs_decode_path(decode_path[:-1], rel_path[1:])
5854     return decode_path + rel_path
5855
5856
5857 SequenceState = namedtuple(
5858     "SequenceState",
5859     BasicState._fields + ("specs", "value",),
5860     **NAMEDTUPLE_KWARGS
5861 )
5862
5863
5864 class Sequence(Obj):
5865     """``SEQUENCE`` structure type
5866
5867     You have to make specification of sequence::
5868
5869         class Extension(Sequence):
5870             schema = (
5871                 ("extnID", ObjectIdentifier()),
5872                 ("critical", Boolean(default=False)),
5873                 ("extnValue", OctetString()),
5874             )
5875
5876     Then, you can work with it as with dictionary.
5877
5878     >>> ext = Extension()
5879     >>> Extension().specs
5880     OrderedDict([
5881         ('extnID', OBJECT IDENTIFIER),
5882         ('critical', BOOLEAN False OPTIONAL DEFAULT),
5883         ('extnValue', OCTET STRING),
5884     ])
5885     >>> ext["extnID"] = "1.2.3"
5886     Traceback (most recent call last):
5887     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
5888     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
5889
5890     You can determine if sequence is ready to be encoded:
5891
5892     >>> ext.ready
5893     False
5894     >>> ext.encode()
5895     Traceback (most recent call last):
5896     pyderasn.ObjNotReady: object is not ready: extnValue
5897     >>> ext["extnValue"] = OctetString(b"foobar")
5898     >>> ext.ready
5899     True
5900
5901     Value you want to assign, must have the same **type** as in
5902     corresponding specification, but it can have different tags,
5903     optional/default attributes -- they will be taken from specification
5904     automatically::
5905
5906         class TBSCertificate(Sequence):
5907             schema = (
5908                 ("version", Version(expl=tag_ctxc(0), default="v1")),
5909             [...]
5910
5911     >>> tbs = TBSCertificate()
5912     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
5913
5914     Assign ``None`` to remove value from sequence.
5915
5916     You can set values in Sequence during its initialization:
5917
5918     >>> AlgorithmIdentifier((
5919         ("algorithm", ObjectIdentifier("1.2.3")),
5920         ("parameters", Any(Null()))
5921     ))
5922     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
5923
5924     You can determine if value exists/set in the sequence and take its value:
5925
5926     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
5927     (True, True, False)
5928     >>> ext["extnID"]
5929     OBJECT IDENTIFIER 1.2.3
5930
5931     But pay attention that if value has default, then it won't be (not
5932     in) in the sequence (because ``DEFAULT`` must not be encoded in
5933     DER), but you can read its value:
5934
5935     >>> "critical" in ext, ext["critical"]
5936     (False, BOOLEAN False)
5937     >>> ext["critical"] = Boolean(True)
5938     >>> "critical" in ext, ext["critical"]
5939     (True, BOOLEAN True)
5940
5941     All defaulted values are always optional.
5942
5943     .. _allow_default_values_ctx:
5944
5945     DER prohibits default value encoding and will raise an error if
5946     default value is unexpectedly met during decode.
5947     If :ref:`bered <bered_ctx>` context option is set, then no error
5948     will be raised, but ``bered`` attribute set. You can disable strict
5949     defaulted values existence validation by setting
5950     ``"allow_default_values": True`` :ref:`context <ctx>` option.
5951
5952     .. warning::
5953
5954        Check for default value existence is not performed in
5955        ``evgen_mode``, because previously decoded values are not stored
5956        in memory, to be able to compare them.
5957
5958     Two sequences are equal if they have equal specification (schema),
5959     implicit/explicit tagging and the same values.
5960     """
5961     __slots__ = ("specs",)
5962     tag_default = tag_encode(form=TagFormConstructed, num=16)
5963     asn1_type_name = "SEQUENCE"
5964
5965     def __init__(
5966             self,
5967             value=None,
5968             schema=None,
5969             impl=None,
5970             expl=None,
5971             default=None,
5972             optional=False,
5973             _decoded=(0, 0, 0),
5974     ):
5975         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
5976         if schema is None:
5977             schema = getattr(self, "schema", ())
5978         self.specs = (
5979             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5980         )
5981         self._value = {}
5982         if value is not None:
5983             if issubclass(value.__class__, Sequence):
5984                 self._value = value._value
5985             elif hasattr(value, "__iter__"):
5986                 for seq_key, seq_value in value:
5987                     self[seq_key] = seq_value
5988             else:
5989                 raise InvalidValueType((Sequence,))
5990         if default is not None:
5991             if not issubclass(default.__class__, Sequence):
5992                 raise InvalidValueType((Sequence,))
5993             default_value = default._value
5994             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5995             default_obj.specs = self.specs
5996             default_obj._value = default_value
5997             self.default = default_obj
5998             if value is None:
5999                 self._value = copy(default_obj._value)
6000
6001     @property
6002     def ready(self):
6003         for name, spec in iteritems(self.specs):
6004             value = self._value.get(name)
6005             if value is None:
6006                 if spec.optional:
6007                     continue
6008                 return False
6009             if not value.ready:
6010                 return False
6011         return True
6012
6013     @property
6014     def bered(self):
6015         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6016             return True
6017         return any(value.bered for value in itervalues(self._value))
6018
6019     def __getstate__(self):
6020         return SequenceState(
6021             __version__,
6022             self.tag,
6023             self._tag_order,
6024             self._expl,
6025             self.default,
6026             self.optional,
6027             self.offset,
6028             self.llen,
6029             self.vlen,
6030             self.expl_lenindef,
6031             self.lenindef,
6032             self.ber_encoded,
6033             self.specs,
6034             {k: copy(v) for k, v in iteritems(self._value)},
6035         )
6036
6037     def __setstate__(self, state):
6038         super(Sequence, self).__setstate__(state)
6039         self.specs = state.specs
6040         self._value = state.value
6041
6042     def __eq__(self, their):
6043         if not isinstance(their, self.__class__):
6044             return False
6045         return (
6046             self.specs == their.specs and
6047             self.tag == their.tag and
6048             self._expl == their._expl and
6049             self._value == their._value
6050         )
6051
6052     def __call__(
6053             self,
6054             value=None,
6055             impl=None,
6056             expl=None,
6057             default=None,
6058             optional=None,
6059     ):
6060         return self.__class__(
6061             value=value,
6062             schema=self.specs,
6063             impl=self.tag if impl is None else impl,
6064             expl=self._expl if expl is None else expl,
6065             default=self.default if default is None else default,
6066             optional=self.optional if optional is None else optional,
6067         )
6068
6069     def __contains__(self, key):
6070         return key in self._value
6071
6072     def __setitem__(self, key, value):
6073         spec = self.specs.get(key)
6074         if spec is None:
6075             raise ObjUnknown(key)
6076         if value is None:
6077             self._value.pop(key, None)
6078             return
6079         if not isinstance(value, spec.__class__):
6080             raise InvalidValueType((spec.__class__,))
6081         value = spec(value=value)
6082         if spec.default is not None and value == spec.default:
6083             self._value.pop(key, None)
6084             return
6085         self._value[key] = value
6086
6087     def __getitem__(self, key):
6088         value = self._value.get(key)
6089         if value is not None:
6090             return value
6091         spec = self.specs.get(key)
6092         if spec is None:
6093             raise ObjUnknown(key)
6094         if spec.default is not None:
6095             return spec.default
6096         return None
6097
6098     def _values_for_encoding(self):
6099         for name, spec in iteritems(self.specs):
6100             value = self._value.get(name)
6101             if value is None:
6102                 if spec.optional:
6103                     continue
6104                 raise ObjNotReady(name)
6105             yield value
6106
6107     def _encode(self):
6108         v = b"".join(v.encode() for v in self._values_for_encoding())
6109         return b"".join((self.tag, len_encode(len(v)), v))
6110
6111     def _encode_cer(self, writer):
6112         write_full(writer, self.tag + LENINDEF)
6113         for v in self._values_for_encoding():
6114             v.encode_cer(writer)
6115         write_full(writer, EOC)
6116
6117     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6118         try:
6119             t, tlen, lv = tag_strip(tlv)
6120         except DecodeError as err:
6121             raise err.__class__(
6122                 msg=err.msg,
6123                 klass=self.__class__,
6124                 decode_path=decode_path,
6125                 offset=offset,
6126             )
6127         if t != self.tag:
6128             raise TagMismatch(
6129                 klass=self.__class__,
6130                 decode_path=decode_path,
6131                 offset=offset,
6132             )
6133         if tag_only:  # pragma: no cover
6134             yield None
6135             return
6136         lenindef = False
6137         ctx_bered = ctx.get("bered", False)
6138         try:
6139             l, llen, v = len_decode(lv)
6140         except LenIndefForm as err:
6141             if not ctx_bered:
6142                 raise err.__class__(
6143                     msg=err.msg,
6144                     klass=self.__class__,
6145                     decode_path=decode_path,
6146                     offset=offset,
6147                 )
6148             l, llen, v = 0, 1, lv[1:]
6149             lenindef = True
6150         except DecodeError as err:
6151             raise err.__class__(
6152                 msg=err.msg,
6153                 klass=self.__class__,
6154                 decode_path=decode_path,
6155                 offset=offset,
6156             )
6157         if l > len(v):
6158             raise NotEnoughData(
6159                 "encoded length is longer than data",
6160                 klass=self.__class__,
6161                 decode_path=decode_path,
6162                 offset=offset,
6163             )
6164         if not lenindef:
6165             v, tail = v[:l], v[l:]
6166         vlen = 0
6167         sub_offset = offset + tlen + llen
6168         values = {}
6169         ber_encoded = False
6170         ctx_allow_default_values = ctx.get("allow_default_values", False)
6171         for name, spec in iteritems(self.specs):
6172             if spec.optional and (
6173                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6174                     len(v) == 0
6175             ):
6176                 continue
6177             sub_decode_path = decode_path + (name,)
6178             try:
6179                 if evgen_mode:
6180                     for _decode_path, value, v_tail in spec.decode_evgen(
6181                             v,
6182                             sub_offset,
6183                             leavemm=True,
6184                             decode_path=sub_decode_path,
6185                             ctx=ctx,
6186                             _ctx_immutable=False,
6187                     ):
6188                         yield _decode_path, value, v_tail
6189                 else:
6190                     _, value, v_tail = next(spec.decode_evgen(
6191                         v,
6192                         sub_offset,
6193                         leavemm=True,
6194                         decode_path=sub_decode_path,
6195                         ctx=ctx,
6196                         _ctx_immutable=False,
6197                         _evgen_mode=False,
6198                     ))
6199             except TagMismatch as err:
6200                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6201                     continue
6202                 raise
6203
6204             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6205             if not evgen_mode and defined is not None:
6206                 defined_by, defined_spec = defined
6207                 if issubclass(value.__class__, SequenceOf):
6208                     for i, _value in enumerate(value):
6209                         sub_sub_decode_path = sub_decode_path + (
6210                             str(i),
6211                             DecodePathDefBy(defined_by),
6212                         )
6213                         defined_value, defined_tail = defined_spec.decode(
6214                             memoryview(bytes(_value)),
6215                             sub_offset + (
6216                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6217                                 if value.expled else (value.tlen + value.llen)
6218                             ),
6219                             leavemm=True,
6220                             decode_path=sub_sub_decode_path,
6221                             ctx=ctx,
6222                             _ctx_immutable=False,
6223                         )
6224                         if len(defined_tail) > 0:
6225                             raise DecodeError(
6226                                 "remaining data",
6227                                 klass=self.__class__,
6228                                 decode_path=sub_sub_decode_path,
6229                                 offset=offset,
6230                             )
6231                         _value.defined = (defined_by, defined_value)
6232                 else:
6233                     defined_value, defined_tail = defined_spec.decode(
6234                         memoryview(bytes(value)),
6235                         sub_offset + (
6236                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6237                             if value.expled else (value.tlen + value.llen)
6238                         ),
6239                         leavemm=True,
6240                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6241                         ctx=ctx,
6242                         _ctx_immutable=False,
6243                     )
6244                     if len(defined_tail) > 0:
6245                         raise DecodeError(
6246                             "remaining data",
6247                             klass=self.__class__,
6248                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6249                             offset=offset,
6250                         )
6251                     value.defined = (defined_by, defined_value)
6252
6253             value_len = value.fulllen
6254             vlen += value_len
6255             sub_offset += value_len
6256             v = v_tail
6257             if not evgen_mode:
6258                 if spec.default is not None and value == spec.default:
6259                     # This will not work in evgen_mode
6260                     if ctx_bered or ctx_allow_default_values:
6261                         ber_encoded = True
6262                     else:
6263                         raise DecodeError(
6264                             "DEFAULT value met",
6265                             klass=self.__class__,
6266                             decode_path=sub_decode_path,
6267                             offset=sub_offset,
6268                         )
6269                 values[name] = value
6270                 spec_defines = getattr(spec, "defines", ())
6271                 if len(spec_defines) == 0:
6272                     defines_by_path = ctx.get("defines_by_path", ())
6273                     if len(defines_by_path) > 0:
6274                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6275                 if spec_defines is not None and len(spec_defines) > 0:
6276                     for rel_path, schema in spec_defines:
6277                         defined = schema.get(value, None)
6278                         if defined is not None:
6279                             ctx.setdefault("_defines", []).append((
6280                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6281                                 (value, defined),
6282                             ))
6283         if lenindef:
6284             if v[:EOC_LEN].tobytes() != EOC:
6285                 raise DecodeError(
6286                     "no EOC",
6287                     klass=self.__class__,
6288                     decode_path=decode_path,
6289                     offset=offset,
6290                 )
6291             tail = v[EOC_LEN:]
6292             vlen += EOC_LEN
6293         elif len(v) > 0:
6294             raise DecodeError(
6295                 "remaining data",
6296                 klass=self.__class__,
6297                 decode_path=decode_path,
6298                 offset=offset,
6299             )
6300         obj = self.__class__(
6301             schema=self.specs,
6302             impl=self.tag,
6303             expl=self._expl,
6304             default=self.default,
6305             optional=self.optional,
6306             _decoded=(offset, llen, vlen),
6307         )
6308         obj._value = values
6309         obj.lenindef = lenindef
6310         obj.ber_encoded = ber_encoded
6311         yield decode_path, obj, tail
6312
6313     def __repr__(self):
6314         value = pp_console_row(next(self.pps()))
6315         cols = []
6316         for name in self.specs:
6317             _value = self._value.get(name)
6318             if _value is None:
6319                 continue
6320             cols.append("%s: %s" % (name, repr(_value)))
6321         return "%s[%s]" % (value, "; ".join(cols))
6322
6323     def pps(self, decode_path=()):
6324         yield _pp(
6325             obj=self,
6326             asn1_type_name=self.asn1_type_name,
6327             obj_name=self.__class__.__name__,
6328             decode_path=decode_path,
6329             optional=self.optional,
6330             default=self == self.default,
6331             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6332             expl=None if self._expl is None else tag_decode(self._expl),
6333             offset=self.offset,
6334             tlen=self.tlen,
6335             llen=self.llen,
6336             vlen=self.vlen,
6337             expl_offset=self.expl_offset if self.expled else None,
6338             expl_tlen=self.expl_tlen if self.expled else None,
6339             expl_llen=self.expl_llen if self.expled else None,
6340             expl_vlen=self.expl_vlen if self.expled else None,
6341             expl_lenindef=self.expl_lenindef,
6342             lenindef=self.lenindef,
6343             ber_encoded=self.ber_encoded,
6344             bered=self.bered,
6345         )
6346         for name in self.specs:
6347             value = self._value.get(name)
6348             if value is None:
6349                 continue
6350             yield value.pps(decode_path=decode_path + (name,))
6351         for pp in self.pps_lenindef(decode_path):
6352             yield pp
6353
6354
6355 class Set(Sequence):
6356     """``SET`` structure type
6357
6358     Its usage is identical to :py:class:`pyderasn.Sequence`.
6359
6360     .. _allow_unordered_set_ctx:
6361
6362     DER prohibits unordered values encoding and will raise an error
6363     during decode. If :ref:`bered <bered_ctx>` context option is set,
6364     then no error will occur. Also you can disable strict values
6365     ordering check by setting ``"allow_unordered_set": True``
6366     :ref:`context <ctx>` option.
6367     """
6368     __slots__ = ()
6369     tag_default = tag_encode(form=TagFormConstructed, num=17)
6370     asn1_type_name = "SET"
6371
6372     def _encode(self):
6373         v = b"".join(value.encode() for value in sorted(
6374             self._values_for_encoding(),
6375             key=attrgetter("tag_order"),
6376         ))
6377         return b"".join((self.tag, len_encode(len(v)), v))
6378
6379     def _encode_cer(self, writer):
6380         write_full(writer, self.tag + LENINDEF)
6381         for v in sorted(
6382                 self._values_for_encoding(),
6383                 key=attrgetter("tag_order_cer"),
6384         ):
6385             v.encode_cer(writer)
6386         write_full(writer, EOC)
6387
6388     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6389         try:
6390             t, tlen, lv = tag_strip(tlv)
6391         except DecodeError as err:
6392             raise err.__class__(
6393                 msg=err.msg,
6394                 klass=self.__class__,
6395                 decode_path=decode_path,
6396                 offset=offset,
6397             )
6398         if t != self.tag:
6399             raise TagMismatch(
6400                 klass=self.__class__,
6401                 decode_path=decode_path,
6402                 offset=offset,
6403             )
6404         if tag_only:
6405             yield None
6406             return
6407         lenindef = False
6408         ctx_bered = ctx.get("bered", False)
6409         try:
6410             l, llen, v = len_decode(lv)
6411         except LenIndefForm as err:
6412             if not ctx_bered:
6413                 raise err.__class__(
6414                     msg=err.msg,
6415                     klass=self.__class__,
6416                     decode_path=decode_path,
6417                     offset=offset,
6418                 )
6419             l, llen, v = 0, 1, lv[1:]
6420             lenindef = True
6421         except DecodeError as err:
6422             raise err.__class__(
6423                 msg=err.msg,
6424                 klass=self.__class__,
6425                 decode_path=decode_path,
6426                 offset=offset,
6427             )
6428         if l > len(v):
6429             raise NotEnoughData(
6430                 "encoded length is longer than data",
6431                 klass=self.__class__,
6432                 offset=offset,
6433             )
6434         if not lenindef:
6435             v, tail = v[:l], v[l:]
6436         vlen = 0
6437         sub_offset = offset + tlen + llen
6438         values = {}
6439         ber_encoded = False
6440         ctx_allow_default_values = ctx.get("allow_default_values", False)
6441         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6442         tag_order_prev = (0, 0)
6443         _specs_items = copy(self.specs)
6444
6445         while len(v) > 0:
6446             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6447                 break
6448             for name, spec in iteritems(_specs_items):
6449                 sub_decode_path = decode_path + (name,)
6450                 try:
6451                     spec.decode(
6452                         v,
6453                         sub_offset,
6454                         leavemm=True,
6455                         decode_path=sub_decode_path,
6456                         ctx=ctx,
6457                         tag_only=True,
6458                         _ctx_immutable=False,
6459                     )
6460                 except TagMismatch:
6461                     continue
6462                 break
6463             else:
6464                 raise TagMismatch(
6465                     klass=self.__class__,
6466                     decode_path=decode_path,
6467                     offset=offset,
6468                 )
6469             if evgen_mode:
6470                 for _decode_path, value, v_tail in spec.decode_evgen(
6471                         v,
6472                         sub_offset,
6473                         leavemm=True,
6474                         decode_path=sub_decode_path,
6475                         ctx=ctx,
6476                         _ctx_immutable=False,
6477                 ):
6478                     yield _decode_path, value, v_tail
6479             else:
6480                 _, value, v_tail = next(spec.decode_evgen(
6481                     v,
6482                     sub_offset,
6483                     leavemm=True,
6484                     decode_path=sub_decode_path,
6485                     ctx=ctx,
6486                     _ctx_immutable=False,
6487                     _evgen_mode=False,
6488                 ))
6489             value_tag_order = value.tag_order
6490             value_len = value.fulllen
6491             if tag_order_prev >= value_tag_order:
6492                 if ctx_bered or ctx_allow_unordered_set:
6493                     ber_encoded = True
6494                 else:
6495                     raise DecodeError(
6496                         "unordered " + self.asn1_type_name,
6497                         klass=self.__class__,
6498                         decode_path=sub_decode_path,
6499                         offset=sub_offset,
6500                     )
6501             if spec.default is None or value != spec.default:
6502                 pass
6503             elif ctx_bered or ctx_allow_default_values:
6504                 ber_encoded = True
6505             else:
6506                 raise DecodeError(
6507                     "DEFAULT value met",
6508                     klass=self.__class__,
6509                     decode_path=sub_decode_path,
6510                     offset=sub_offset,
6511                 )
6512             values[name] = value
6513             del _specs_items[name]
6514             tag_order_prev = value_tag_order
6515             sub_offset += value_len
6516             vlen += value_len
6517             v = v_tail
6518
6519         obj = self.__class__(
6520             schema=self.specs,
6521             impl=self.tag,
6522             expl=self._expl,
6523             default=self.default,
6524             optional=self.optional,
6525             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6526         )
6527         if lenindef:
6528             if v[:EOC_LEN].tobytes() != EOC:
6529                 raise DecodeError(
6530                     "no EOC",
6531                     klass=self.__class__,
6532                     decode_path=decode_path,
6533                     offset=offset,
6534                 )
6535             tail = v[EOC_LEN:]
6536             obj.lenindef = True
6537         for name, spec in iteritems(self.specs):
6538             if name not in values and not spec.optional:
6539                 raise DecodeError(
6540                     "%s value is not ready" % name,
6541                     klass=self.__class__,
6542                     decode_path=decode_path,
6543                     offset=offset,
6544                 )
6545         if not evgen_mode:
6546             obj._value = values
6547         obj.ber_encoded = ber_encoded
6548         yield decode_path, obj, tail
6549
6550
6551 SequenceOfState = namedtuple(
6552     "SequenceOfState",
6553     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6554     **NAMEDTUPLE_KWARGS
6555 )
6556
6557
6558 class SequenceOf(Obj):
6559     """``SEQUENCE OF`` sequence type
6560
6561     For that kind of type you must specify the object it will carry on
6562     (bounds are for example here, not required)::
6563
6564         class Ints(SequenceOf):
6565             schema = Integer()
6566             bounds = (0, 2)
6567
6568     >>> ints = Ints()
6569     >>> ints.append(Integer(123))
6570     >>> ints.append(Integer(234))
6571     >>> ints
6572     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6573     >>> [int(i) for i in ints]
6574     [123, 234]
6575     >>> ints.append(Integer(345))
6576     Traceback (most recent call last):
6577     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6578     >>> ints[1]
6579     INTEGER 234
6580     >>> ints[1] = Integer(345)
6581     >>> ints
6582     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6583
6584     You can initialize sequence with preinitialized values:
6585
6586     >>> ints = Ints([Integer(123), Integer(234)])
6587
6588     Also you can use iterator as a value:
6589
6590     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6591
6592     And it won't be iterated until encoding process. Pay attention that
6593     bounds and required schema checks are done only during the encoding
6594     process in that case! After encode was called, then value is zeroed
6595     back to empty list and you have to set it again. That mode is useful
6596     mainly with CER encoding mode, where all objects from the iterable
6597     will be streamed to the buffer, without copying all of them to
6598     memory first.
6599     """
6600     __slots__ = ("spec", "_bound_min", "_bound_max")
6601     tag_default = tag_encode(form=TagFormConstructed, num=16)
6602     asn1_type_name = "SEQUENCE OF"
6603
6604     def __init__(
6605             self,
6606             value=None,
6607             schema=None,
6608             bounds=None,
6609             impl=None,
6610             expl=None,
6611             default=None,
6612             optional=False,
6613             _decoded=(0, 0, 0),
6614     ):
6615         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6616         if schema is None:
6617             schema = getattr(self, "schema", None)
6618         if schema is None:
6619             raise ValueError("schema must be specified")
6620         self.spec = schema
6621         self._bound_min, self._bound_max = getattr(
6622             self,
6623             "bounds",
6624             (0, float("+inf")),
6625         ) if bounds is None else bounds
6626         self._value = []
6627         if value is not None:
6628             self._value = self._value_sanitize(value)
6629         if default is not None:
6630             default_value = self._value_sanitize(default)
6631             default_obj = self.__class__(
6632                 schema=schema,
6633                 impl=self.tag,
6634                 expl=self._expl,
6635             )
6636             default_obj._value = default_value
6637             self.default = default_obj
6638             if value is None:
6639                 self._value = copy(default_obj._value)
6640
6641     def _value_sanitize(self, value):
6642         iterator = False
6643         if issubclass(value.__class__, SequenceOf):
6644             value = value._value
6645         elif hasattr(value, NEXT_ATTR_NAME):
6646             iterator = True
6647             value = value
6648         elif hasattr(value, "__iter__"):
6649             value = list(value)
6650         else:
6651             raise InvalidValueType((self.__class__, iter, "iterator"))
6652         if not iterator:
6653             if not self._bound_min <= len(value) <= self._bound_max:
6654                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6655             class_expected = self.spec.__class__
6656             for v in value:
6657                 if not isinstance(v, class_expected):
6658                     raise InvalidValueType((class_expected,))
6659         return value
6660
6661     @property
6662     def ready(self):
6663         if hasattr(self._value, NEXT_ATTR_NAME):
6664             return True
6665         if self._bound_min > 0 and len(self._value) == 0:
6666             return False
6667         return all(v.ready for v in self._value)
6668
6669     @property
6670     def bered(self):
6671         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6672             return True
6673         return any(v.bered for v in self._value)
6674
6675     def __getstate__(self):
6676         if hasattr(self._value, NEXT_ATTR_NAME):
6677             raise ValueError("can not pickle SequenceOf with iterator")
6678         return SequenceOfState(
6679             __version__,
6680             self.tag,
6681             self._tag_order,
6682             self._expl,
6683             self.default,
6684             self.optional,
6685             self.offset,
6686             self.llen,
6687             self.vlen,
6688             self.expl_lenindef,
6689             self.lenindef,
6690             self.ber_encoded,
6691             self.spec,
6692             [copy(v) for v in self._value],
6693             self._bound_min,
6694             self._bound_max,
6695         )
6696
6697     def __setstate__(self, state):
6698         super(SequenceOf, self).__setstate__(state)
6699         self.spec = state.spec
6700         self._value = state.value
6701         self._bound_min = state.bound_min
6702         self._bound_max = state.bound_max
6703
6704     def __eq__(self, their):
6705         if isinstance(their, self.__class__):
6706             return (
6707                 self.spec == their.spec and
6708                 self.tag == their.tag and
6709                 self._expl == their._expl and
6710                 self._value == their._value
6711             )
6712         if hasattr(their, "__iter__"):
6713             return self._value == list(their)
6714         return False
6715
6716     def __call__(
6717             self,
6718             value=None,
6719             bounds=None,
6720             impl=None,
6721             expl=None,
6722             default=None,
6723             optional=None,
6724     ):
6725         return self.__class__(
6726             value=value,
6727             schema=self.spec,
6728             bounds=(
6729                 (self._bound_min, self._bound_max)
6730                 if bounds is None else bounds
6731             ),
6732             impl=self.tag if impl is None else impl,
6733             expl=self._expl if expl is None else expl,
6734             default=self.default if default is None else default,
6735             optional=self.optional if optional is None else optional,
6736         )
6737
6738     def __contains__(self, key):
6739         return key in self._value
6740
6741     def append(self, value):
6742         if not isinstance(value, self.spec.__class__):
6743             raise InvalidValueType((self.spec.__class__,))
6744         if len(self._value) + 1 > self._bound_max:
6745             raise BoundsError(
6746                 self._bound_min,
6747                 len(self._value) + 1,
6748                 self._bound_max,
6749             )
6750         self._value.append(value)
6751
6752     def __iter__(self):
6753         return iter(self._value)
6754
6755     def __len__(self):
6756         return len(self._value)
6757
6758     def __setitem__(self, key, value):
6759         if not isinstance(value, self.spec.__class__):
6760             raise InvalidValueType((self.spec.__class__,))
6761         self._value[key] = self.spec(value=value)
6762
6763     def __getitem__(self, key):
6764         return self._value[key]
6765
6766     def _values_for_encoding(self):
6767         return iter(self._value)
6768
6769     def _encode(self):
6770         iterator = hasattr(self._value, NEXT_ATTR_NAME)
6771         if iterator:
6772             values = []
6773             values_append = values.append
6774             class_expected = self.spec.__class__
6775             values_for_encoding = self._values_for_encoding()
6776             self._value = []
6777             for v in values_for_encoding:
6778                 if not isinstance(v, class_expected):
6779                     raise InvalidValueType((class_expected,))
6780                 values_append(v.encode())
6781             if not self._bound_min <= len(values) <= self._bound_max:
6782                 raise BoundsError(self._bound_min, len(values), self._bound_max)
6783             value = b"".join(values)
6784         else:
6785             value = b"".join(v.encode() for v in self._values_for_encoding())
6786         return b"".join((self.tag, len_encode(len(value)), value))
6787
6788     def _encode_cer(self, writer):
6789         write_full(writer, self.tag + LENINDEF)
6790         iterator = hasattr(self._value, NEXT_ATTR_NAME)
6791         if iterator:
6792             class_expected = self.spec.__class__
6793             values_count = 0
6794             values_for_encoding = self._values_for_encoding()
6795             self._value = []
6796             for v in values_for_encoding:
6797                 if not isinstance(v, class_expected):
6798                     raise InvalidValueType((class_expected,))
6799                 v.encode_cer(writer)
6800                 values_count += 1
6801             if not self._bound_min <= values_count <= self._bound_max:
6802                 raise BoundsError(self._bound_min, values_count, self._bound_max)
6803         else:
6804             for v in self._values_for_encoding():
6805                 v.encode_cer(writer)
6806         write_full(writer, EOC)
6807
6808     def _decode(
6809             self,
6810             tlv,
6811             offset,
6812             decode_path,
6813             ctx,
6814             tag_only,
6815             evgen_mode,
6816             ordering_check=False,
6817     ):
6818         try:
6819             t, tlen, lv = tag_strip(tlv)
6820         except DecodeError as err:
6821             raise err.__class__(
6822                 msg=err.msg,
6823                 klass=self.__class__,
6824                 decode_path=decode_path,
6825                 offset=offset,
6826             )
6827         if t != self.tag:
6828             raise TagMismatch(
6829                 klass=self.__class__,
6830                 decode_path=decode_path,
6831                 offset=offset,
6832             )
6833         if tag_only:
6834             yield None
6835             return
6836         lenindef = False
6837         ctx_bered = ctx.get("bered", False)
6838         try:
6839             l, llen, v = len_decode(lv)
6840         except LenIndefForm as err:
6841             if not ctx_bered:
6842                 raise err.__class__(
6843                     msg=err.msg,
6844                     klass=self.__class__,
6845                     decode_path=decode_path,
6846                     offset=offset,
6847                 )
6848             l, llen, v = 0, 1, lv[1:]
6849             lenindef = True
6850         except DecodeError as err:
6851             raise err.__class__(
6852                 msg=err.msg,
6853                 klass=self.__class__,
6854                 decode_path=decode_path,
6855                 offset=offset,
6856             )
6857         if l > len(v):
6858             raise NotEnoughData(
6859                 "encoded length is longer than data",
6860                 klass=self.__class__,
6861                 decode_path=decode_path,
6862                 offset=offset,
6863             )
6864         if not lenindef:
6865             v, tail = v[:l], v[l:]
6866         vlen = 0
6867         sub_offset = offset + tlen + llen
6868         _value = []
6869         _value_count = 0
6870         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6871         value_prev = memoryview(v[:0])
6872         ber_encoded = False
6873         spec = self.spec
6874         while len(v) > 0:
6875             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6876                 break
6877             sub_decode_path = decode_path + (str(_value_count),)
6878             if evgen_mode:
6879                 for _decode_path, value, v_tail in spec.decode_evgen(
6880                         v,
6881                         sub_offset,
6882                         leavemm=True,
6883                         decode_path=sub_decode_path,
6884                         ctx=ctx,
6885                         _ctx_immutable=False,
6886                 ):
6887                     yield _decode_path, value, v_tail
6888             else:
6889                 _, value, v_tail = next(spec.decode_evgen(
6890                     v,
6891                     sub_offset,
6892                     leavemm=True,
6893                     decode_path=sub_decode_path,
6894                     ctx=ctx,
6895                     _ctx_immutable=False,
6896                     _evgen_mode=False,
6897                 ))
6898             value_len = value.fulllen
6899             if ordering_check:
6900                 if value_prev.tobytes() > v[:value_len].tobytes():
6901                     if ctx_bered or ctx_allow_unordered_set:
6902                         ber_encoded = True
6903                     else:
6904                         raise DecodeError(
6905                             "unordered " + self.asn1_type_name,
6906                             klass=self.__class__,
6907                             decode_path=sub_decode_path,
6908                             offset=sub_offset,
6909                         )
6910                 value_prev = v[:value_len]
6911             _value_count += 1
6912             if not evgen_mode:
6913                 _value.append(value)
6914             sub_offset += value_len
6915             vlen += value_len
6916             v = v_tail
6917         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
6918             raise DecodeError(
6919                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
6920                 klass=self.__class__,
6921                 decode_path=decode_path,
6922                 offset=offset,
6923             )
6924         try:
6925             obj = self.__class__(
6926                 value=None if evgen_mode else _value,
6927                 schema=spec,
6928                 bounds=(self._bound_min, self._bound_max),
6929                 impl=self.tag,
6930                 expl=self._expl,
6931                 default=self.default,
6932                 optional=self.optional,
6933                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6934             )
6935         except BoundsError as err:
6936             raise DecodeError(
6937                 msg=str(err),
6938                 klass=self.__class__,
6939                 decode_path=decode_path,
6940                 offset=offset,
6941             )
6942         if lenindef:
6943             if v[:EOC_LEN].tobytes() != EOC:
6944                 raise DecodeError(
6945                     "no EOC",
6946                     klass=self.__class__,
6947                     decode_path=decode_path,
6948                     offset=offset,
6949                 )
6950             obj.lenindef = True
6951             tail = v[EOC_LEN:]
6952         obj.ber_encoded = ber_encoded
6953         yield decode_path, obj, tail
6954
6955     def __repr__(self):
6956         return "%s[%s]" % (
6957             pp_console_row(next(self.pps())),
6958             ", ".join(repr(v) for v in self._value),
6959         )
6960
6961     def pps(self, decode_path=()):
6962         yield _pp(
6963             obj=self,
6964             asn1_type_name=self.asn1_type_name,
6965             obj_name=self.__class__.__name__,
6966             decode_path=decode_path,
6967             optional=self.optional,
6968             default=self == self.default,
6969             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6970             expl=None if self._expl is None else tag_decode(self._expl),
6971             offset=self.offset,
6972             tlen=self.tlen,
6973             llen=self.llen,
6974             vlen=self.vlen,
6975             expl_offset=self.expl_offset if self.expled else None,
6976             expl_tlen=self.expl_tlen if self.expled else None,
6977             expl_llen=self.expl_llen if self.expled else None,
6978             expl_vlen=self.expl_vlen if self.expled else None,
6979             expl_lenindef=self.expl_lenindef,
6980             lenindef=self.lenindef,
6981             ber_encoded=self.ber_encoded,
6982             bered=self.bered,
6983         )
6984         for i, value in enumerate(self._value):
6985             yield value.pps(decode_path=decode_path + (str(i),))
6986         for pp in self.pps_lenindef(decode_path):
6987             yield pp
6988
6989
6990 class SetOf(SequenceOf):
6991     """``SET OF`` sequence type
6992
6993     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
6994     """
6995     __slots__ = ()
6996     tag_default = tag_encode(form=TagFormConstructed, num=17)
6997     asn1_type_name = "SET OF"
6998
6999     def _value_sanitize(self, value):
7000         value = super(SetOf, self)._value_sanitize(value)
7001         if hasattr(value, NEXT_ATTR_NAME):
7002             raise ValueError(
7003                 "SetOf does not support iterator values, as no sense in them"
7004             )
7005         return value
7006
7007     def _encode(self):
7008         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7009         return b"".join((self.tag, len_encode(len(v)), v))
7010
7011     def _encode_cer(self, writer):
7012         write_full(writer, self.tag + LENINDEF)
7013         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7014             write_full(writer, v)
7015         write_full(writer, EOC)
7016
7017     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7018         return super(SetOf, self)._decode(
7019             tlv,
7020             offset,
7021             decode_path,
7022             ctx,
7023             tag_only,
7024             evgen_mode,
7025             ordering_check=True,
7026         )
7027
7028
7029 def obj_by_path(pypath):  # pragma: no cover
7030     """Import object specified as string Python path
7031
7032     Modules must be separated from classes/functions with ``:``.
7033
7034     >>> obj_by_path("foo.bar:Baz")
7035     <class 'foo.bar.Baz'>
7036     >>> obj_by_path("foo.bar:Baz.boo")
7037     <classmethod 'foo.bar.Baz.boo'>
7038     """
7039     mod, objs = pypath.rsplit(":", 1)
7040     from importlib import import_module
7041     obj = import_module(mod)
7042     for obj_name in objs.split("."):
7043         obj = getattr(obj, obj_name)
7044     return obj
7045
7046
7047 def generic_decoder():  # pragma: no cover
7048     # All of this below is a big hack with self references
7049     choice = PrimitiveTypes()
7050     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7051     choice.specs["SetOf"] = SetOf(schema=choice)
7052     for i in six_xrange(31):
7053         choice.specs["SequenceOf%d" % i] = SequenceOf(
7054             schema=choice,
7055             expl=tag_ctxc(i),
7056         )
7057     choice.specs["Any"] = Any()
7058
7059     # Class name equals to type name, to omit it from output
7060     class SEQUENCEOF(SequenceOf):
7061         __slots__ = ()
7062         schema = choice
7063
7064     def pprint_any(
7065             obj,
7066             oid_maps=(),
7067             with_colours=False,
7068             with_decode_path=False,
7069             decode_path_only=(),
7070     ):
7071         def _pprint_pps(pps):
7072             for pp in pps:
7073                 if hasattr(pp, "_fields"):
7074                     if (
7075                             decode_path_only != () and
7076                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7077                     ):
7078                         continue
7079                     if pp.asn1_type_name == Choice.asn1_type_name:
7080                         continue
7081                     pp_kwargs = pp._asdict()
7082                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7083                     pp = _pp(**pp_kwargs)
7084                     yield pp_console_row(
7085                         pp,
7086                         oid_maps=oid_maps,
7087                         with_offsets=True,
7088                         with_blob=False,
7089                         with_colours=with_colours,
7090                         with_decode_path=with_decode_path,
7091                         decode_path_len_decrease=len(decode_path_only),
7092                     )
7093                     for row in pp_console_blob(
7094                             pp,
7095                             decode_path_len_decrease=len(decode_path_only),
7096                     ):
7097                         yield row
7098                 else:
7099                     for row in _pprint_pps(pp):
7100                         yield row
7101         return "\n".join(_pprint_pps(obj.pps()))
7102     return SEQUENCEOF(), pprint_any
7103
7104
7105 def main():  # pragma: no cover
7106     import argparse
7107     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7108     parser.add_argument(
7109         "--skip",
7110         type=int,
7111         default=0,
7112         help="Skip that number of bytes from the beginning",
7113     )
7114     parser.add_argument(
7115         "--oids",
7116         help="Python paths to dictionary with OIDs, comma separated",
7117     )
7118     parser.add_argument(
7119         "--schema",
7120         help="Python path to schema definition to use",
7121     )
7122     parser.add_argument(
7123         "--defines-by-path",
7124         help="Python path to decoder's defines_by_path",
7125     )
7126     parser.add_argument(
7127         "--nobered",
7128         action="store_true",
7129         help="Disallow BER encoding",
7130     )
7131     parser.add_argument(
7132         "--print-decode-path",
7133         action="store_true",
7134         help="Print decode paths",
7135     )
7136     parser.add_argument(
7137         "--decode-path-only",
7138         help="Print only specified decode path",
7139     )
7140     parser.add_argument(
7141         "--allow-expl-oob",
7142         action="store_true",
7143         help="Allow explicit tag out-of-bound",
7144     )
7145     parser.add_argument(
7146         "--evgen",
7147         action="store_true",
7148         help="Turn on event generation mode",
7149     )
7150     parser.add_argument(
7151         "RAWFile",
7152         type=argparse.FileType("rb"),
7153         help="Path to BER/CER/DER file you want to decode",
7154     )
7155     args = parser.parse_args()
7156     if PY2:
7157         args.RAWFile.seek(args.skip)
7158         raw = memoryview(args.RAWFile.read())
7159         args.RAWFile.close()
7160     else:
7161         raw = file_mmaped(args.RAWFile)[args.skip:]
7162     oid_maps = (
7163         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7164         if args.oids else ()
7165     )
7166     if args.schema:
7167         schema = obj_by_path(args.schema)
7168         from functools import partial
7169         pprinter = partial(pprint, big_blobs=True)
7170     else:
7171         schema, pprinter = generic_decoder()
7172     ctx = {
7173         "bered": not args.nobered,
7174         "allow_expl_oob": args.allow_expl_oob,
7175     }
7176     if args.defines_by_path is not None:
7177         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7178     from os import environ
7179     pprinter = partial(
7180         pprinter,
7181         oid_maps=oid_maps,
7182         with_colours=environ.get("NO_COLOR") is None,
7183         with_decode_path=args.print_decode_path,
7184         decode_path_only=(
7185             () if args.decode_path_only is None else
7186             tuple(args.decode_path_only.split(":"))
7187         ),
7188     )
7189     if args.evgen:
7190         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7191             print(pprinter(obj, decode_path=decode_path))
7192     else:
7193         obj, tail = schema().decode(raw, ctx=ctx)
7194         print(pprinter(obj))
7195     if tail != b"":
7196         print("\nTrailing data: %s" % hexenc(tail))
7197
7198
7199 if __name__ == "__main__":
7200     main()