]> Cypherpunks.ru repositories - pyderasn.git/blob - pyderasn.py
Up to date documentation
[pyderasn.git] / pyderasn.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # cython: language_level=3
4 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures
5 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
6 #
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, version 3 of the License.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 """Python ASN.1 DER/BER codec with abstract structures
19
20 This library allows you to marshal various structures in ASN.1 DER
21 format, unmarshal them in BER/CER/DER ones.
22
23     >>> i = Integer(123)
24     >>> raw = i.encode()
25     >>> Integer().decod(raw) == i
26     True
27
28 There are primitive types, holding single values
29 (:py:class:`pyderasn.BitString`,
30 :py:class:`pyderasn.Boolean`,
31 :py:class:`pyderasn.Enumerated`,
32 :py:class:`pyderasn.GeneralizedTime`,
33 :py:class:`pyderasn.Integer`,
34 :py:class:`pyderasn.Null`,
35 :py:class:`pyderasn.ObjectIdentifier`,
36 :py:class:`pyderasn.OctetString`,
37 :py:class:`pyderasn.UTCTime`,
38 :py:class:`various strings <pyderasn.CommonString>`
39 (:py:class:`pyderasn.BMPString`,
40 :py:class:`pyderasn.GeneralString`,
41 :py:class:`pyderasn.GraphicString`,
42 :py:class:`pyderasn.IA5String`,
43 :py:class:`pyderasn.ISO646String`,
44 :py:class:`pyderasn.NumericString`,
45 :py:class:`pyderasn.PrintableString`,
46 :py:class:`pyderasn.T61String`,
47 :py:class:`pyderasn.TeletexString`,
48 :py:class:`pyderasn.UniversalString`,
49 :py:class:`pyderasn.UTF8String`,
50 :py:class:`pyderasn.VideotexString`,
51 :py:class:`pyderasn.VisibleString`)),
52 constructed types, holding multiple primitive types
53 (:py:class:`pyderasn.Sequence`,
54 :py:class:`pyderasn.SequenceOf`,
55 :py:class:`pyderasn.Set`,
56 :py:class:`pyderasn.SetOf`),
57 and special types like
58 :py:class:`pyderasn.Any` and
59 :py:class:`pyderasn.Choice`.
60
61 Common for most types
62 ---------------------
63
64 Tags
65 ____
66
67 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
68 the default tag used during coding process. You can override it with
69 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
70 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
71 argument or ``expl`` class attribute). Both arguments take raw binary
72 string, containing that tag. You can **not** set implicit and explicit
73 tags simultaneously.
74
75 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
76 functions, allowing you to easily create ``CONTEXT``
77 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
78 number.
79
80 .. note::
81
82    EXPLICIT tags always have **constructed** tag. PyDERASN does not
83    explicitly check correctness of schema input here.
84
85 .. note::
86
87    Implicit tags have **primitive** (``tag_ctxp``) encoding for
88    primitive values.
89
90 ::
91
92     >>> Integer(impl=tag_ctxp(1))
93     [1] INTEGER
94     >>> Integer(expl=tag_ctxc(2))
95     [2] EXPLICIT INTEGER
96
97 Implicit tag is not explicitly shown.
98
99 Two objects of the same type, but with different implicit/explicit tags
100 are **not** equal.
101
102 You can get object's effective tag (either default or implicited) through
103 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
104 function::
105
106     >>> tag_decode(tag_ctxc(123))
107     (128, 32, 123)
108     >>> klass, form, num = tag_decode(tag_ctxc(123))
109     >>> klass == TagClassContext
110     True
111     >>> form == TagFormConstructed
112     True
113
114 To determine if object has explicit tag, use ``expled`` boolean property
115 and ``expl_tag`` property, returning explicit tag's value.
116
117 Default/optional
118 ________________
119
120 Many objects in sequences could be ``OPTIONAL`` and could have
121 ``DEFAULT`` value. You can specify that object's property using
122 corresponding keyword arguments.
123
124     >>> Integer(optional=True, default=123)
125     INTEGER 123 OPTIONAL DEFAULT
126
127 Those specifications do not play any role in primitive value encoding,
128 but are taken into account when dealing with sequences holding them. For
129 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
130 ``version`` field::
131
132     class Version(Integer):
133         schema = (
134             ("v1", 0),
135             ("v2", 1),
136             ("v3", 2),
137         )
138     class TBSCertificate(Sequence):
139         schema = (
140             ("version", Version(expl=tag_ctxc(0), default="v1")),
141         [...]
142
143 When default argument is used and value is not specified, then it equals
144 to default one.
145
146 .. _bounds:
147
148 Size constraints
149 ________________
150
151 Some objects give ability to set value size constraints. This is either
152 possible integer value, or allowed length of various strings and
153 sequences. Constraints are set in the following way::
154
155     class X(...):
156         bounds = (MIN, MAX)
157
158 And values satisfaction is checked as: ``MIN <= X <= MAX``.
159
160 For simplicity you can also set bounds the following way::
161
162     bounded_x = X(bounds=(MIN, MAX))
163
164 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
165 raised.
166
167 Common methods
168 ______________
169
170 All objects have ``ready`` boolean property, that tells if object is
171 ready to be encoded. If that kind of action is performed on unready
172 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
173
174 All objects are friendly to ``copy.copy()`` and copied objects can be
175 safely mutated.
176
177 Also all objects can be safely ``pickle``-d, but pay attention that
178 pickling among different PyDERASN versions is prohibited.
179
180 .. _decoding:
181
182 Decoding
183 --------
184
185 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
186 ``offset`` optional argument could be used to set initial object's
187 offset in the binary data, for convenience. It returns decoded object
188 and remaining unmarshalled data (tail). Internally all work is done on
189 ``memoryview(data)``, and you can leave returning tail as a memoryview,
190 by specifying ``leavemm=True`` argument.
191
192 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
193 immediately checks and raises if there is non-empty tail.
194
195 When object is decoded, ``decoded`` property is true and you can safely
196 use following properties:
197
198 * ``offset`` -- position including initial offset where object's tag starts
199 * ``tlen`` -- length of object's tag
200 * ``llen`` -- length of object's length value
201 * ``vlen`` -- length of object's value
202 * ``tlvlen`` -- length of the whole object
203
204 Pay attention that those values do **not** include anything related to
205 explicit tag. If you want to know information about it, then use:
206
207 * ``expled`` -- to know if explicit tag is set
208 * ``expl_offset`` (it is lesser than ``offset``)
209 * ``expl_tlen``,
210 * ``expl_llen``
211 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
212 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
213   ``offset`` otherwise
214 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
215   ``tlvlen`` otherwise
216
217 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
218
219 .. _ctx:
220
221 Context
222 _______
223
224 You can specify so called context keyword argument during
225 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
226 various options governing decoding process.
227
228 Currently available context options:
229
230 * :ref:`allow_default_values <allow_default_values_ctx>`
231 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
232 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
233 * :ref:`bered <bered_ctx>`
234 * :ref:`defines_by_path <defines_by_path_ctx>`
235 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
236
237 .. _pprinting:
238
239 Pretty printing
240 ---------------
241
242 All objects have ``pps()`` method, that is a generator of
243 :py:class:`pyderasn.PP` namedtuple, holding various raw information
244 about the object. If ``pps`` is called on sequences, then all underlying
245 ``PP`` will be yielded.
246
247 You can use :py:func:`pyderasn.pp_console_row` function, converting
248 those ``PP`` to human readable string. Actually exactly it is used for
249 all object ``repr``. But it is easy to write custom formatters.
250
251     >>> from pyderasn import pprint
252     >>> encoded = Integer(-12345).encode()
253     >>> obj, tail = Integer().decode(encoded)
254     >>> print(pprint(obj))
255         0   [1,1,   2] INTEGER -12345
256
257 .. _pprint_example:
258
259 Example certificate::
260
261     >>> print(pprint(crt))
262         0   [1,3,1604] Certificate SEQUENCE
263         4   [1,3,1453]  . tbsCertificate: TBSCertificate SEQUENCE
264        10-2 [1,1,   1]  . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
265        13   [1,1,   3]  . . serialNumber: CertificateSerialNumber INTEGER 61595
266        18   [1,1,  13]  . . signature: AlgorithmIdentifier SEQUENCE
267        20   [1,1,   9]  . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
268        31   [0,0,   2]  . . . parameters: [UNIV 5] ANY OPTIONAL
269                         . . . . 05:00
270        33   [0,0, 278]  . . issuer: Name CHOICE rdnSequence
271        33   [1,3, 274]  . . . rdnSequence: RDNSequence SEQUENCE OF
272        37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
273        39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
274        41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
275        46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
276                         . . . . . . . 13:02:45:53
277     [...]
278      1461   [1,1,  13]  . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
279      1463   [1,1,   9]  . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
280      1474   [0,0,   2]  . . parameters: [UNIV 5] ANY OPTIONAL
281                         . . . 05:00
282      1476   [1,2, 129]  . signatureValue: BIT STRING 1024 bits
283                         . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
284                         . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
285      [...]
286
287     Trailing data: 0a
288
289 Let's parse that output, human::
290
291        10-2 [1,1,   1]    . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
292        ^  ^  ^ ^    ^     ^   ^        ^            ^       ^       ^  ^
293        0  1  2 3    4     5   6        7            8       9       10 11
294
295 ::
296
297        20   [1,1,   9]    . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
298        ^     ^ ^    ^     ^     ^          ^                 ^
299        0     2 3    4     5     6          9                 10
300
301 ::
302
303        33   [0,0, 278]    . . issuer: Name CHOICE rdnSequence
304        ^     ^ ^    ^     ^   ^       ^    ^      ^
305        0     2 3    4     5   6       8    9      10
306
307 ::
308
309        52-2∞ B [1,1,1054]∞  . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
310              ^           ^                                 ^   ^            ^
311             12          13                                14   9            10
312
313 :0:
314  Offset of the object, where its DER/BER encoding begins.
315  Pay attention that it does **not** include explicit tag.
316 :1:
317  If explicit tag exists, then this is its length (tag + encoded length).
318 :2:
319  Length of object's tag. For example CHOICE does not have its own tag,
320  so it is zero.
321 :3:
322  Length of encoded length.
323 :4:
324  Length of encoded value.
325 :5:
326  Visual indentation to show the depth of object in the hierarchy.
327 :6:
328  Object's name inside SEQUENCE/CHOICE.
329 :7:
330  If either IMPLICIT or EXPLICIT tag is set, then it will be shown
331  here. "IMPLICIT" is omitted.
332 :8:
333  Object's class name, if set. Omitted if it is just an ordinary simple
334  value (like with ``algorithm`` in example above).
335 :9:
336  Object's ASN.1 type.
337 :10:
338  Object's value, if set. Can consist of multiple words (like OCTET/BIT
339  STRINGs above). We see ``v3`` value in Version, because it is named.
340  ``rdnSequence`` is the choice of CHOICE type.
341 :11:
342  Possible other flags like OPTIONAL and DEFAULT, if value equals to the
343  default one, specified in the schema.
344 :12:
345  Shows does object contains any kind of BER encoded data (possibly
346  Sequence holding BER-encoded underlying value).
347 :13:
348  Only applicable to BER encoded data. Indefinite length encoding mark.
349 :14:
350  Only applicable to BER encoded data. If object has BER-specific
351  encoding, then ``BER`` will be shown. It does not depend on indefinite
352  length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
353  (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
354  could be BERed.
355
356 .. _definedby:
357
358 DEFINED BY
359 ----------
360
361 ASN.1 structures often have ANY and OCTET STRING fields, that are
362 DEFINED BY some previously met ObjectIdentifier. This library provides
363 ability to specify mapping between some OID and field that must be
364 decoded with specific specification.
365
366 .. _defines:
367
368 defines kwarg
369 _____________
370
371 :py:class:`pyderasn.ObjectIdentifier` field inside
372 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
373 necessary for decoding structures. For example, CMS (:rfc:`5652`)
374 container::
375
376     class ContentInfo(Sequence):
377         schema = (
378             ("contentType", ContentType(defines=((("content",), {
379                 id_digestedData: DigestedData(),
380                 id_signedData: SignedData(),
381             }),))),
382             ("content", Any(expl=tag_ctxc(0))),
383         )
384
385 ``contentType`` field tells that it defines that ``content`` must be
386 decoded with ``SignedData`` specification, if ``contentType`` equals to
387 ``id-signedData``. The same applies to ``DigestedData``. If
388 ``contentType`` contains unknown OID, then no automatic decoding is
389 done.
390
391 You can specify multiple fields, that will be autodecoded -- that is why
392 ``defines`` kwarg is a sequence. You can specify defined field
393 relatively or absolutely to current decode path. For example ``defines``
394 for AlgorithmIdentifier of X.509's
395 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
396
397         (
398             (("parameters",), {
399                 id_ecPublicKey: ECParameters(),
400                 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
401             }),
402             (("..", "subjectPublicKey"), {
403                 id_rsaEncryption: RSAPublicKey(),
404                 id_GostR3410_2001: OctetString(),
405             }),
406         ),
407
408 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
409 autodecode its parameters inside SPKI's algorithm and its public key
410 itself.
411
412 Following types can be automatically decoded (DEFINED BY):
413
414 * :py:class:`pyderasn.Any`
415 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
416 * :py:class:`pyderasn.OctetString`
417 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
418   ``Any``/``BitString``/``OctetString``-s
419
420 When any of those fields is automatically decoded, then ``.defined``
421 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
422 was defined, ``value`` contains corresponding decoded value. For example
423 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
424
425 .. _defines_by_path_ctx:
426
427 defines_by_path context option
428 ______________________________
429
430 Sometimes you either can not or do not want to explicitly set *defines*
431 in the schema. You can dynamically apply those definitions when calling
432 :py:meth:`pyderasn.Obj.decode` method.
433
434 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
435 value must be sequence of following tuples::
436
437     (decode_path, defines)
438
439 where ``decode_path`` is a tuple holding so-called decode path to the
440 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
441 ``defines``, holding exactly the same value as accepted in its
442 :ref:`keyword argument <defines>`.
443
444 For example, again for CMS, you want to automatically decode
445 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
446 structures it may hold. Also, automatically decode ``controlSequence``
447 of ``PKIResponse``::
448
449     content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
450         (
451             ("contentType",),
452             ((("content",), {id_signedData: SignedData()}),),
453         ),
454         (
455             (
456                 "content",
457                 DecodePathDefBy(id_signedData),
458                 "encapContentInfo",
459                 "eContentType",
460             ),
461             ((("eContent",), {
462                 id_cct_PKIData: PKIData(),
463                 id_cct_PKIResponse: PKIResponse(),
464             })),
465         ),
466         (
467             (
468                 "content",
469                 DecodePathDefBy(id_signedData),
470                 "encapContentInfo",
471                 "eContent",
472                 DecodePathDefBy(id_cct_PKIResponse),
473                 "controlSequence",
474                 any,
475                 "attrType",
476             ),
477             ((("attrValues",), {
478                 id_cmc_recipientNonce: RecipientNonce(),
479                 id_cmc_senderNonce: SenderNonce(),
480                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
481                 id_cmc_transactionId: TransactionId(),
482             })),
483         ),
484     )})
485
486 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
487 First function is useful for path construction when some automatic
488 decoding is already done. ``any`` means literally any value it meet --
489 useful for SEQUENCE/SET OF-s.
490
491 .. _bered_ctx:
492
493 BER encoding
494 ------------
495
496 By default PyDERASN accepts only DER encoded data. By default it encodes
497 to DER. But you can optionally enable BER decoding with setting
498 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
499 constructed primitive types should be parsed successfully.
500
501 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
502   attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
503   STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
504   ``UTCTime``, ``GeneralizedTime`` can contain it.
505 * If object has an indefinite length encoding, then its ``lenindef``
506   attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
507   ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
508   contain it.
509 * If object has an indefinite length encoded explicit tag, then
510   ``expl_lenindef`` is set to True.
511 * If object has either any of BER-related encoding (explicit tag
512   indefinite length, object's indefinite length, BER-encoding) or any
513   underlying component has that kind of encoding, then ``bered``
514   attribute is set to True. For example SignedData CMS can have
515   ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
516   ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
517
518 EOC (end-of-contents) token's length is taken in advance in object's
519 value length.
520
521 .. _allow_expl_oob_ctx:
522
523 Allow explicit tag out-of-bound
524 -------------------------------
525
526 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
527 one value, more than one object. If you set ``allow_expl_oob`` context
528 option to True, then no error will be raised and that invalid encoding
529 will be silently further processed. But pay attention that offsets and
530 lengths will be invalid in that case.
531
532 .. warning::
533
534    This option should be used only for skipping some decode errors, just
535    to see the decoded structure somehow.
536
537 .. _streaming:
538
539 Streaming and dealing with huge structures
540 ------------------------------------------
541
542 .. _evgen_mode:
543
544 evgen mode
545 __________
546
547 ASN.1 structures can be huge, they can hold millions of objects inside
548 (for example Certificate Revocation Lists (CRL), holding revocation
549 state for every previously issued X.509 certificate). CACert.org's 8 MiB
550 CRL file takes more than half a gigabyte of memory to hold the decoded
551 structure.
552
553 If you just simply want to check the signature over the ``tbsCertList``,
554 you can create specialized schema with that field represented as
555 OctetString for example::
556
557     class TBSCertListFast(Sequence):
558         schema = (
559             [...]
560             ("revokedCertificates", OctetString(
561                 impl=SequenceOf.tag_default,
562                 optional=True,
563             )),
564             [...]
565         )
566
567 This allows you to quickly decode a few fields and check the signature
568 over the ``tbsCertList`` bytes.
569
570 But how can you get all certificate's serial number from it, after you
571 trust that CRL after signature validation? You can use so called
572 ``evgen`` (event generation) mode, to catch the events/facts of some
573 successful object decoding. Let's use command line capabilities::
574
575     $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
576          10     [1,1,   1]   . . version: Version INTEGER v2 (01) OPTIONAL
577          15     [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
578          26     [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL
579          13     [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE
580          34     [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
581          39     [0,0,   9]   . . . . . . value: [UNIV 19] AttributeValue ANY
582          32     [1,1,  14]   . . . . . 0: AttributeTypeAndValue SEQUENCE
583          30     [1,1,  16]   . . . . 0: RelativeDistinguishedName SET OF
584     [...]
585         188     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
586         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
587         191     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
588         191     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589         186     [1,1,  18]   . . . 0: RevokedCertificate SEQUENCE
590         208     [1,1,   1]   . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
591         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
592         211     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
593         211     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594         206     [1,1,  18]   . . . 1: RevokedCertificate SEQUENCE
595     [...]
596     9144992     [0,0,  15]   . . . . revocationDate: Time CHOICE utcTime
597     9144992     [1,1,  13]   . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
598     9144985     [1,1,  20]   . . . 415755: RevokedCertificate SEQUENCE
599       181     [1,4,9144821]   . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
600         5     [1,4,9144997]   . tbsCertList: TBSCertList SEQUENCE
601     9145009     [1,1,   9]   . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
602     9145020     [0,0,   2]   . . parameters: [UNIV 5] ANY OPTIONAL
603     9145007     [1,1,  13]   . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
604     9145022     [1,3, 513]   . signatureValue: BIT STRING 4096 bits
605         0     [1,4,9145534]  CertificateList SEQUENCE
606
607 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
608 decodes underlying values. It can not tell if SEQUENCE is decoded, so
609 the event of the upper level SEQUENCE is the last one we see.
610 ``version`` field is just a single INTEGER -- it is decoded and event is
611 fired immediately. Then we see that ``algorithm`` and ``parameters``
612 fields are decoded and only after them the ``signature`` SEQUENCE is
613 fired as a successfully decoded. There are 4 events for each revoked
614 certificate entry in that CRL: ``userCertificate`` serial number,
615 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
616 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
617
618 We can do that in our ordinary Python code and understand where we are
619 by looking at deterministically generated decode paths (do not forget
620 about useful ``--print-decode-path`` CLI option). We must use
621 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
622 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
623 obj, tail)`` tuples::
624
625     for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
626         if (
627             len(decode_path) == 4 and
628             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
629             decode_path[3] == "userCertificate"
630         ):
631             print("serial number:", int(obj))
632
633 Virtually it does not take any memory except at least needed for single
634 object storage. You can easily use that mode to determine required
635 object ``.offset`` and ``.*len`` to be able to decode it separately, or
636 maybe verify signature upon it just by taking bytes by ``.offset`` and
637 ``.tlvlen``.
638
639 .. _evgen_mode_upto_ctx:
640
641 evgen_mode_upto
642 _______________
643
644 There is full ability to get any kind of data from the CRL in the
645 example above. However it is not too convenient to get the whole
646 ``RevokedCertificate`` structure, that is pretty lightweight and one may
647 do not want to disassemble it. You can use ``evgen_mode_upto``
648 :ref:`ctx <ctx>` option that semantically equals to
649 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
650 mapped to any non-None value. If specified decode path is met, then any
651 subsequent objects won't be decoded in evgen mode. That allows us to
652 parse the CRL above with fully assembled ``RevokedCertificate``::
653
654     for decode_path, obj, _ in CertificateList().decode_evgen(
655         crl_raw,
656         ctx={"evgen_mode_upto": (
657             (("tbsCertList", "revokedCertificates", any), True),
658         )},
659     ):
660         if (
661             len(decode_path) == 3 and
662             decode_path[:2] == ("tbsCertList", "revokedCertificates"),
663         ):
664             print("serial number:", int(obj["userCertificate"]))
665
666 .. _mmap:
667
668 mmap-ed file
669 ____________
670
671 POSIX compliant systems have ``mmap`` syscall, giving ability to work
672 the memory mapped file. You can deal with the file like it was an
673 ordinary binary string, allowing you not to load it to the memory first.
674 Also you can use them as an input for OCTET STRING, taking no Python
675 memory for their storage.
676
677 There is convenient :py:func:`pyderasn.file_mmaped` function that
678 creates read-only memoryview on the file contents::
679
680     with open("huge", "rb") as fd:
681         raw = file_mmaped(fd)
682         obj = Something.decode(raw)
683
684 .. warning::
685
686    mmap-ed files in Python2.7 does not implement buffer protocol, so
687    memoryview won't work on them.
688
689 .. warning::
690
691    mmap maps the **whole** file. So it plays no role if you seek-ed it
692    before. Take the slice of the resulting memoryview with required
693    offset instead.
694
695 .. note::
696
697    If you use ZFS as underlying storage, then pay attention that
698    currently most platforms does not deal good with ZFS ARC and ordinary
699    page cache used for mmaps. It can take twice the necessary size in
700    the memory: both in page cache and ZFS ARC.
701
702 CER encoding
703 ____________
704
705 We can parse any kind of data now, but how can we produce files
706 streamingly, without storing their encoded representation in memory?
707 SEQUENCE by default encodes in memory all its values, joins them in huge
708 binary string, just to know the exact size of SEQUENCE's value for
709 encoding it in TLV. DER requires you to know all exact sizes of the
710 objects.
711
712 You can use CER encoding mode, that slightly differs from the DER, but
713 does not require exact sizes knowledge, allowing streaming encoding
714 directly to some writer/buffer. Just use
715 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
716 encoded data will flow::
717
718     opener = io.open if PY2 else open
719     with opener("result", "wb") as fd:
720         obj.encode_cer(fd.write)
721
722 ::
723
724     buf = io.BytesIO()
725     obj.encode_cer(buf.write)
726
727 If you do not want to create in-memory buffer every time, then you can
728 use :py:func:`pyderasn.encode_cer` function::
729
730     data = encode_cer(obj)
731
732 Remember that CER is **not valid** DER in most cases, so you **have to**
733 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
734 decoding. Also currently there is **no** validation that provided CER is
735 valid one -- you are sure that it has only valid BER encoding.
736
737 .. warning::
738
739    SET OF values can not be streamingly encoded, because they are
740    required to be sorted byte-by-byte. Big SET OF values still will take
741    much memory. Use neither SET nor SET OF values, as modern ASN.1
742    also recommends too.
743
744 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
745 OCTET STRINGs! They will be streamingly copied from underlying file to
746 the buffer using 1 KB chunks.
747
748 Some structures require that some of the elements have to be forcefully
749 DER encoded. For example ``SignedData`` CMS requires you to encode
750 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
751 encode everything else in BER. You can tell any of the structures to be
752 forcefully encoded in DER during CER encoding, by specifying
753 ``der_forced=True`` attribute::
754
755     class Certificate(Sequence):
756         schema = (...)
757         der_forced = True
758
759     class SignedAttributes(SetOf):
760         schema = Attribute()
761         bounds = (1, 32)
762         der_forced = True
763
764 agg_octet_string
765 ________________
766
767 In most cases, huge quantity of binary data is stored as OCTET STRING.
768 CER encoding splits it on 1 KB chunks. BER allows splitting on various
769 levels of chunks inclusion::
770
771     SOME STRING[CONSTRUCTED]
772         OCTET STRING[CONSTRUCTED]
773             OCTET STRING[PRIMITIVE]
774                 DATA CHUNK
775             OCTET STRING[PRIMITIVE]
776                 DATA CHUNK
777             OCTET STRING[PRIMITIVE]
778                 DATA CHUNK
779         OCTET STRING[PRIMITIVE]
780             DATA CHUNK
781         OCTET STRING[CONSTRUCTED]
782             OCTET STRING[PRIMITIVE]
783                 DATA CHUNK
784             OCTET STRING[PRIMITIVE]
785                 DATA CHUNK
786         OCTET STRING[CONSTRUCTED]
787             OCTET STRING[CONSTRUCTED]
788                 OCTET STRING[PRIMITIVE]
789                     DATA CHUNK
790
791 You can not just take the offset and some ``.vlen`` of the STRING and
792 treat it as the payload. If you decode it without
793 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
794 and ``bytes()`` will give the whole payload contents.
795
796 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
797 small memory footprint. There is convenient
798 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
799 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
800 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
801 SHA512 digest of its ``eContent``::
802
803     fd = open("data.p7m", "rb")
804     raw = file_mmaped(fd)
805     ctx = {"bered": True}
806     for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
807         if decode_path == ("content",):
808             content = obj
809             break
810     else:
811         raise ValueError("no content found")
812     hasher_state = sha512()
813     def hasher(data):
814         hasher_state.update(data)
815         return len(data)
816     evgens = SignedData().decode_evgen(
817         raw[content.offset:],
818         offset=content.offset,
819         ctx=ctx,
820     )
821     agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
822     fd.close()
823     digest = hasher_state.digest()
824
825 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
826 copy the payload (without BER/CER encoding interleaved overhead) in it.
827 Virtually it won't take memory more than for keeping small structures
828 and 1 KB binary chunks.
829
830 SEQUENCE OF iterators
831 _____________________
832
833 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
834 classes. The only difference with providing the full list of objects, is
835 that type and bounds checking is done during encoding process. Also
836 sequence's value will be emptied after encoding, forcing you to set its
837 value again.
838
839 This is very useful when you have to create some huge objects, like
840 CRLs, with thousands and millions of entities inside. You can write the
841 generator taking necessary data from the database and giving the
842 ``RevokedCertificate`` objects. Only binary representation of that
843 objects will take memory during DER encoding.
844
845 Base Obj
846 --------
847 .. autoclass:: pyderasn.Obj
848    :members:
849
850 Primitive types
851 ---------------
852
853 Boolean
854 _______
855 .. autoclass:: pyderasn.Boolean
856    :members: __init__
857
858 Integer
859 _______
860 .. autoclass:: pyderasn.Integer
861    :members: __init__, named
862
863 BitString
864 _________
865 .. autoclass:: pyderasn.BitString
866    :members: __init__, bit_len, named
867
868 OctetString
869 ___________
870 .. autoclass:: pyderasn.OctetString
871    :members: __init__
872
873 Null
874 ____
875 .. autoclass:: pyderasn.Null
876    :members: __init__
877
878 ObjectIdentifier
879 ________________
880 .. autoclass:: pyderasn.ObjectIdentifier
881    :members: __init__
882
883 Enumerated
884 __________
885 .. autoclass:: pyderasn.Enumerated
886
887 CommonString
888 ____________
889 .. autoclass:: pyderasn.CommonString
890
891 NumericString
892 _____________
893 .. autoclass:: pyderasn.NumericString
894
895 PrintableString
896 _______________
897 .. autoclass:: pyderasn.PrintableString
898    :members: __init__, allow_asterisk, allow_ampersand
899
900 UTCTime
901 _______
902 .. autoclass:: pyderasn.UTCTime
903    :members: __init__, todatetime
904
905 GeneralizedTime
906 _______________
907 .. autoclass:: pyderasn.GeneralizedTime
908    :members: __init__, todatetime
909
910 Special types
911 -------------
912
913 Choice
914 ______
915 .. autoclass:: pyderasn.Choice
916    :members: __init__, choice, value
917
918 PrimitiveTypes
919 ______________
920 .. autoclass:: PrimitiveTypes
921
922 Any
923 ___
924 .. autoclass:: pyderasn.Any
925    :members: __init__
926
927 Constructed types
928 -----------------
929
930 Sequence
931 ________
932 .. autoclass:: pyderasn.Sequence
933    :members: __init__
934
935 Set
936 ___
937 .. autoclass:: pyderasn.Set
938    :members: __init__
939
940 SequenceOf
941 __________
942 .. autoclass:: pyderasn.SequenceOf
943    :members: __init__
944
945 SetOf
946 _____
947 .. autoclass:: pyderasn.SetOf
948    :members: __init__
949
950 Various
951 -------
952
953 .. autofunction:: pyderasn.abs_decode_path
954 .. autofunction:: pyderasn.agg_octet_string
955 .. autofunction:: pyderasn.colonize_hex
956 .. autofunction:: pyderasn.encode_cer
957 .. autofunction:: pyderasn.file_mmaped
958 .. autofunction:: pyderasn.hexenc
959 .. autofunction:: pyderasn.hexdec
960 .. autofunction:: pyderasn.tag_encode
961 .. autofunction:: pyderasn.tag_decode
962 .. autofunction:: pyderasn.tag_ctxp
963 .. autofunction:: pyderasn.tag_ctxc
964 .. autoclass:: pyderasn.DecodeError
965    :members: __init__
966 .. autoclass:: pyderasn.NotEnoughData
967 .. autoclass:: pyderasn.ExceedingData
968 .. autoclass:: pyderasn.LenIndefForm
969 .. autoclass:: pyderasn.TagMismatch
970 .. autoclass:: pyderasn.InvalidLength
971 .. autoclass:: pyderasn.InvalidOID
972 .. autoclass:: pyderasn.ObjUnknown
973 .. autoclass:: pyderasn.ObjNotReady
974 .. autoclass:: pyderasn.InvalidValueType
975 .. autoclass:: pyderasn.BoundsError
976
977 .. _cmdline:
978
979 Command-line usage
980 ------------------
981
982 You can decode DER/BER files using command line abilities::
983
984     $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
985
986 If there is no schema for your file, then you can try parsing it without,
987 but of course IMPLICIT tags will often make it impossible. But result is
988 good enough for the certificate above::
989
990     $ python -m pyderasn path/to/file
991         0   [1,3,1604]  . >: SEQUENCE OF
992         4   [1,3,1453]  . . >: SEQUENCE OF
993         8   [0,0,   5]  . . . . >: [0] ANY
994                         . . . . . A0:03:02:01:02
995        13   [1,1,   3]  . . . . >: INTEGER 61595
996        18   [1,1,  13]  . . . . >: SEQUENCE OF
997        20   [1,1,   9]  . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
998        31   [1,1,   0]  . . . . . . >: NULL
999        33   [1,3, 274]  . . . . >: SEQUENCE OF
1000        37   [1,1,  11]  . . . . . . >: SET OF
1001        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1002        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1003        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1004     [...]
1005      1409   [1,1,  50]  . . . . . . >: SEQUENCE OF
1006      1411   [1,1,   8]  . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1007      1421   [1,1,  38]  . . . . . . . . >: OCTET STRING 38 bytes
1008                         . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1009                         . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1010                         . . . . . . . . . 61:2E:63:6F:6D:2F
1011      1461   [1,1,  13]  . . >: SEQUENCE OF
1012      1463   [1,1,   9]  . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1013      1474   [1,1,   0]  . . . . >: NULL
1014      1476   [1,2, 129]  . . >: BIT STRING 1024 bits
1015                         . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1016                         . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1017     [...]
1018
1019 Human readable OIDs
1020 ___________________
1021
1022 If you have got dictionaries with ObjectIdentifiers, like example one
1023 from ``tests/test_crts.py``::
1024
1025     stroid2name = {
1026         "1.2.840.113549.1.1.1": "id-rsaEncryption",
1027         "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1028         [...]
1029         "2.5.4.10": "id-at-organizationName",
1030         "2.5.4.11": "id-at-organizationalUnitName",
1031     }
1032
1033 then you can pass it to pretty printer to see human readable OIDs::
1034
1035     $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1036     [...]
1037        37   [1,1,  11]  . . . . . . >: SET OF
1038        39   [1,1,   9]  . . . . . . . . >: SEQUENCE OF
1039        41   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1040        46   [1,1,   2]  . . . . . . . . . . >: PrintableString PrintableString ES
1041        50   [1,1,  18]  . . . . . . >: SET OF
1042        52   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1043        54   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1044        59   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1045        70   [1,1,  18]  . . . . . . >: SET OF
1046        72   [1,1,  16]  . . . . . . . . >: SEQUENCE OF
1047        74   [1,1,   3]  . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1048        79   [1,1,   9]  . . . . . . . . . . >: PrintableString PrintableString Barcelona
1049     [...]
1050
1051 Decode paths
1052 ____________
1053
1054 Each decoded element has so-called decode path: sequence of structure
1055 names it is passing during the decode process. Each element has its own
1056 unique path inside the whole ASN.1 tree. You can print it out with
1057 ``--print-decode-path`` option::
1058
1059     $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1060        0    [1,3,1604]  Certificate SEQUENCE []
1061        4    [1,3,1453]   . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1062       10-2  [1,1,   1]   . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1063       13    [1,1,   3]   . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1064       18    [1,1,  13]   . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1065       20    [1,1,   9]   . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1066       31    [0,0,   2]   . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1067                          . . . . 05:00
1068       33    [0,0, 278]   . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1069       33    [1,3, 274]   . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1070       37    [1,1,  11]   . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1071       39    [1,1,   9]   . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1072       41    [1,1,   3]   . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1073       46    [0,0,   4]   . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1074                          . . . . . . . 13:02:45:53
1075       46    [1,1,   2]   . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1076     [...]
1077
1078 Now you can print only the specified tree, for example signature algorithm::
1079
1080     $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1081       18    [1,1,  13]  AlgorithmIdentifier SEQUENCE
1082       20    [1,1,   9]   . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1083       31    [0,0,   2]   . parameters: [UNIV 5] ANY OPTIONAL
1084                          . . 05:00
1085 """
1086
1087 from codecs import getdecoder
1088 from codecs import getencoder
1089 from collections import namedtuple
1090 from collections import OrderedDict
1091 from copy import copy
1092 from datetime import datetime
1093 from datetime import timedelta
1094 from io import BytesIO
1095 from math import ceil
1096 from mmap import mmap
1097 from mmap import PROT_READ
1098 from operator import attrgetter
1099 from string import ascii_letters
1100 from string import digits
1101 from sys import version_info
1102 from unicodedata import category as unicat
1103
1104 from six import add_metaclass
1105 from six import binary_type
1106 from six import byte2int
1107 from six import indexbytes
1108 from six import int2byte
1109 from six import integer_types
1110 from six import iterbytes
1111 from six import iteritems
1112 from six import itervalues
1113 from six import PY2
1114 from six import string_types
1115 from six import text_type
1116 from six import unichr as six_unichr
1117 from six.moves import xrange as six_xrange
1118
1119
1120 try:
1121     from termcolor import colored
1122 except ImportError:  # pragma: no cover
1123     def colored(what, *args, **kwargs):
1124         return what
1125
1126 __version__ = "7.0"
1127
1128 __all__ = (
1129     "agg_octet_string",
1130     "Any",
1131     "BitString",
1132     "BMPString",
1133     "Boolean",
1134     "BoundsError",
1135     "Choice",
1136     "DecodeError",
1137     "DecodePathDefBy",
1138     "encode_cer",
1139     "Enumerated",
1140     "ExceedingData",
1141     "file_mmaped",
1142     "GeneralizedTime",
1143     "GeneralString",
1144     "GraphicString",
1145     "hexdec",
1146     "hexenc",
1147     "IA5String",
1148     "Integer",
1149     "InvalidLength",
1150     "InvalidOID",
1151     "InvalidValueType",
1152     "ISO646String",
1153     "LenIndefForm",
1154     "NotEnoughData",
1155     "Null",
1156     "NumericString",
1157     "obj_by_path",
1158     "ObjectIdentifier",
1159     "ObjNotReady",
1160     "ObjUnknown",
1161     "OctetString",
1162     "PrimitiveTypes",
1163     "PrintableString",
1164     "Sequence",
1165     "SequenceOf",
1166     "Set",
1167     "SetOf",
1168     "T61String",
1169     "tag_ctxc",
1170     "tag_ctxp",
1171     "tag_decode",
1172     "TagClassApplication",
1173     "TagClassContext",
1174     "TagClassPrivate",
1175     "TagClassUniversal",
1176     "TagFormConstructed",
1177     "TagFormPrimitive",
1178     "TagMismatch",
1179     "TeletexString",
1180     "UniversalString",
1181     "UTCTime",
1182     "UTF8String",
1183     "VideotexString",
1184     "VisibleString",
1185 )
1186
1187 TagClassUniversal = 0
1188 TagClassApplication = 1 << 6
1189 TagClassContext = 1 << 7
1190 TagClassPrivate = 1 << 6 | 1 << 7
1191 TagFormPrimitive = 0
1192 TagFormConstructed = 1 << 5
1193 TagClassReprs = {
1194     TagClassContext: "",
1195     TagClassApplication: "APPLICATION ",
1196     TagClassPrivate: "PRIVATE ",
1197     TagClassUniversal: "UNIV ",
1198 }
1199 EOC = b"\x00\x00"
1200 EOC_LEN = len(EOC)
1201 LENINDEF = b"\x80"  # length indefinite mark
1202 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1203 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1204 SET01 = frozenset("01")
1205 DECIMALS = frozenset(digits)
1206 DECIMAL_SIGNS = ".,"
1207 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1208
1209
1210 def file_mmaped(fd):
1211     """Make mmap-ed memoryview for reading from file
1212
1213     :param fd: file object
1214     :returns: memoryview over read-only mmap-ing of the whole file
1215     """
1216     return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1217
1218 def pureint(value):
1219     if not set(value) <= DECIMALS:
1220         raise ValueError("non-pure integer")
1221     return int(value)
1222
1223 def fractions2float(fractions_raw):
1224     pureint(fractions_raw)
1225     return float("0." + fractions_raw)
1226
1227
1228 def get_def_by_path(defines_by_path, sub_decode_path):
1229     """Get define by decode path
1230     """
1231     for path, define in defines_by_path:
1232         if len(path) != len(sub_decode_path):
1233             continue
1234         for p1, p2 in zip(path, sub_decode_path):
1235             if (not p1 is any) and (p1 != p2):
1236                 break
1237         else:
1238             return define
1239
1240
1241 ########################################################################
1242 # Errors
1243 ########################################################################
1244
1245 class ASN1Error(ValueError):
1246     pass
1247
1248
1249 class DecodeError(ASN1Error):
1250     def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1251         """
1252         :param str msg: reason of decode failing
1253         :param klass: optional exact DecodeError inherited class (like
1254                       :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1255                       :py:exc:`InvalidLength`)
1256         :param decode_path: tuple of strings. It contains human
1257                             readable names of the fields through which
1258                             decoding process has passed
1259         :param int offset: binary offset where failure happened
1260         """
1261         super(DecodeError, self).__init__()
1262         self.msg = msg
1263         self.klass = klass
1264         self.decode_path = decode_path
1265         self.offset = offset
1266
1267     def __str__(self):
1268         return " ".join(
1269             c for c in (
1270                 "" if self.klass is None else self.klass.__name__,
1271                 (
1272                     ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1273                     if len(self.decode_path) > 0 else ""
1274                 ),
1275                 ("(at %d)" % self.offset) if self.offset > 0 else "",
1276                 self.msg,
1277             ) if c != ""
1278         )
1279
1280     def __repr__(self):
1281         return "%s(%s)" % (self.__class__.__name__, self)
1282
1283
1284 class NotEnoughData(DecodeError):
1285     pass
1286
1287
1288 class ExceedingData(ASN1Error):
1289     def __init__(self, nbytes):
1290         super(ExceedingData, self).__init__()
1291         self.nbytes = nbytes
1292
1293     def __str__(self):
1294         return "%d trailing bytes" % self.nbytes
1295
1296     def __repr__(self):
1297         return "%s(%s)" % (self.__class__.__name__, self)
1298
1299
1300 class LenIndefForm(DecodeError):
1301     pass
1302
1303
1304 class TagMismatch(DecodeError):
1305     pass
1306
1307
1308 class InvalidLength(DecodeError):
1309     pass
1310
1311
1312 class InvalidOID(DecodeError):
1313     pass
1314
1315
1316 class ObjUnknown(ASN1Error):
1317     def __init__(self, name):
1318         super(ObjUnknown, self).__init__()
1319         self.name = name
1320
1321     def __str__(self):
1322         return "object is unknown: %s" % self.name
1323
1324     def __repr__(self):
1325         return "%s(%s)" % (self.__class__.__name__, self)
1326
1327
1328 class ObjNotReady(ASN1Error):
1329     def __init__(self, name):
1330         super(ObjNotReady, self).__init__()
1331         self.name = name
1332
1333     def __str__(self):
1334         return "object is not ready: %s" % self.name
1335
1336     def __repr__(self):
1337         return "%s(%s)" % (self.__class__.__name__, self)
1338
1339
1340 class InvalidValueType(ASN1Error):
1341     def __init__(self, expected_types):
1342         super(InvalidValueType, self).__init__()
1343         self.expected_types = expected_types
1344
1345     def __str__(self):
1346         return "invalid value type, expected: %s" % ", ".join(
1347             [repr(t) for t in self.expected_types]
1348         )
1349
1350     def __repr__(self):
1351         return "%s(%s)" % (self.__class__.__name__, self)
1352
1353
1354 class BoundsError(ASN1Error):
1355     def __init__(self, bound_min, value, bound_max):
1356         super(BoundsError, self).__init__()
1357         self.bound_min = bound_min
1358         self.value = value
1359         self.bound_max = bound_max
1360
1361     def __str__(self):
1362         return "unsatisfied bounds: %s <= %s <= %s" % (
1363             self.bound_min,
1364             self.value,
1365             self.bound_max,
1366         )
1367
1368     def __repr__(self):
1369         return "%s(%s)" % (self.__class__.__name__, self)
1370
1371
1372 ########################################################################
1373 # Basic coders
1374 ########################################################################
1375
1376 _hexdecoder = getdecoder("hex")
1377 _hexencoder = getencoder("hex")
1378
1379
1380 def hexdec(data):
1381     """Binary data to hexadecimal string convert
1382     """
1383     return _hexdecoder(data)[0]
1384
1385
1386 def hexenc(data):
1387     """Hexadecimal string to binary data convert
1388     """
1389     return _hexencoder(data)[0].decode("ascii")
1390
1391
1392 def int_bytes_len(num, byte_len=8):
1393     if num == 0:
1394         return 1
1395     return int(ceil(float(num.bit_length()) / byte_len))
1396
1397
1398 def zero_ended_encode(num):
1399     octets = bytearray(int_bytes_len(num, 7))
1400     i = len(octets) - 1
1401     octets[i] = num & 0x7F
1402     num >>= 7
1403     i -= 1
1404     while num > 0:
1405         octets[i] = 0x80 | (num & 0x7F)
1406         num >>= 7
1407         i -= 1
1408     return bytes(octets)
1409
1410
1411 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1412     """Encode tag to binary form
1413
1414     :param int num: tag's number
1415     :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1416                       :py:data:`pyderasn.TagClassContext`,
1417                       :py:data:`pyderasn.TagClassApplication`,
1418                       :py:data:`pyderasn.TagClassPrivate`)
1419     :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1420                      :py:data:`pyderasn.TagFormConstructed`)
1421     """
1422     if num < 31:
1423         # [XX|X|.....]
1424         return int2byte(klass | form | num)
1425     # [XX|X|11111][1.......][1.......] ... [0.......]
1426     return int2byte(klass | form | 31) + zero_ended_encode(num)
1427
1428
1429 def tag_decode(tag):
1430     """Decode tag from binary form
1431
1432     .. warning::
1433
1434        No validation is performed, assuming that it has already passed.
1435
1436     It returns tuple with three integers, as
1437     :py:func:`pyderasn.tag_encode` accepts.
1438     """
1439     first_octet = byte2int(tag)
1440     klass = first_octet & 0xC0
1441     form = first_octet & 0x20
1442     if first_octet & 0x1F < 0x1F:
1443         return (klass, form, first_octet & 0x1F)
1444     num = 0
1445     for octet in iterbytes(tag[1:]):
1446         num <<= 7
1447         num |= octet & 0x7F
1448     return (klass, form, num)
1449
1450
1451 def tag_ctxp(num):
1452     """Create CONTEXT PRIMITIVE tag
1453     """
1454     return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1455
1456
1457 def tag_ctxc(num):
1458     """Create CONTEXT CONSTRUCTED tag
1459     """
1460     return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1461
1462
1463 def tag_strip(data):
1464     """Take off tag from the data
1465
1466     :returns: (encoded tag, tag length, remaining data)
1467     """
1468     if len(data) == 0:
1469         raise NotEnoughData("no data at all")
1470     if byte2int(data) & 0x1F < 31:
1471         return data[:1], 1, data[1:]
1472     i = 0
1473     while True:
1474         i += 1
1475         if i == len(data):
1476             raise DecodeError("unfinished tag")
1477         if indexbytes(data, i) & 0x80 == 0:
1478             break
1479     i += 1
1480     return data[:i], i, data[i:]
1481
1482
1483 def len_encode(l):
1484     if l < 0x80:
1485         return int2byte(l)
1486     octets = bytearray(int_bytes_len(l) + 1)
1487     octets[0] = 0x80 | (len(octets) - 1)
1488     for i in six_xrange(len(octets) - 1, 0, -1):
1489         octets[i] = l & 0xFF
1490         l >>= 8
1491     return bytes(octets)
1492
1493
1494 def len_decode(data):
1495     """Decode length
1496
1497     :returns: (decoded length, length's length, remaining data)
1498     :raises LenIndefForm: if indefinite form encoding is met
1499     """
1500     if len(data) == 0:
1501         raise NotEnoughData("no data at all")
1502     first_octet = byte2int(data)
1503     if first_octet & 0x80 == 0:
1504         return first_octet, 1, data[1:]
1505     octets_num = first_octet & 0x7F
1506     if octets_num + 1 > len(data):
1507         raise NotEnoughData("encoded length is longer than data")
1508     if octets_num == 0:
1509         raise LenIndefForm()
1510     if byte2int(data[1:]) == 0:
1511         raise DecodeError("leading zeros")
1512     l = 0
1513     for v in iterbytes(data[1:1 + octets_num]):
1514         l = (l << 8) | v
1515     if l <= 127:
1516         raise DecodeError("long form instead of short one")
1517     return l, 1 + octets_num, data[1 + octets_num:]
1518
1519
1520 LEN1K = len_encode(1000)
1521
1522
1523 def write_full(writer, data):
1524     """Fully write provided data
1525
1526     :param writer: must comply with ``io.RawIOBase.write`` behaviour
1527
1528     BytesIO does not guarantee that the whole data will be written at
1529     once. That function write everything provided, raising an error if
1530     ``writer`` returns None.
1531     """
1532     data = memoryview(data)
1533     written = 0
1534     while written != len(data):
1535         n = writer(data[written:])
1536         if n is None:
1537             raise ValueError("can not write to buf")
1538         written += n
1539
1540
1541 ########################################################################
1542 # Base class
1543 ########################################################################
1544
1545 class AutoAddSlots(type):
1546     def __new__(cls, name, bases, _dict):
1547         _dict["__slots__"] = _dict.get("__slots__", ())
1548         return type.__new__(cls, name, bases, _dict)
1549
1550
1551 BasicState = namedtuple("BasicState", (
1552     "version",
1553     "tag",
1554     "tag_order",
1555     "expl",
1556     "default",
1557     "optional",
1558     "offset",
1559     "llen",
1560     "vlen",
1561     "expl_lenindef",
1562     "lenindef",
1563     "ber_encoded",
1564 ), **NAMEDTUPLE_KWARGS)
1565
1566
1567 @add_metaclass(AutoAddSlots)
1568 class Obj(object):
1569     """Common ASN.1 object class
1570
1571     All ASN.1 types are inherited from it. It has metaclass that
1572     automatically adds ``__slots__`` to all inherited classes.
1573     """
1574     __slots__ = (
1575         "tag",
1576         "_tag_order",
1577         "_value",
1578         "_expl",
1579         "default",
1580         "optional",
1581         "offset",
1582         "llen",
1583         "vlen",
1584         "expl_lenindef",
1585         "lenindef",
1586         "ber_encoded",
1587     )
1588
1589     def __init__(
1590             self,
1591             impl=None,
1592             expl=None,
1593             default=None,
1594             optional=False,
1595             _decoded=(0, 0, 0),
1596     ):
1597         self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1598         self._expl = getattr(self, "expl", None) if expl is None else expl
1599         if self.tag != self.tag_default and self._expl is not None:
1600             raise ValueError("implicit and explicit tags can not be set simultaneously")
1601         if self.tag is None:
1602             self._tag_order = None
1603         else:
1604             tag_class, _, tag_num = tag_decode(
1605                 self.tag if self._expl is None else self._expl
1606             )
1607             self._tag_order = (tag_class, tag_num)
1608         if default is not None:
1609             optional = True
1610         self.optional = optional
1611         self.offset, self.llen, self.vlen = _decoded
1612         self.default = None
1613         self.expl_lenindef = False
1614         self.lenindef = False
1615         self.ber_encoded = False
1616
1617     @property
1618     def ready(self):  # pragma: no cover
1619         """Is object ready to be encoded?
1620         """
1621         raise NotImplementedError()
1622
1623     def _assert_ready(self):
1624         if not self.ready:
1625             raise ObjNotReady(self.__class__.__name__)
1626
1627     @property
1628     def bered(self):
1629         """Is either object or any elements inside is BER encoded?
1630         """
1631         return self.expl_lenindef or self.lenindef or self.ber_encoded
1632
1633     @property
1634     def decoded(self):
1635         """Is object decoded?
1636         """
1637         return (self.llen + self.vlen) > 0
1638
1639     def __getstate__(self):  # pragma: no cover
1640         """Used for making safe to be mutable pickleable copies
1641         """
1642         raise NotImplementedError()
1643
1644     def __setstate__(self, state):
1645         if state.version != __version__:
1646             raise ValueError("data is pickled by different PyDERASN version")
1647         self.tag = state.tag
1648         self._tag_order = state.tag_order
1649         self._expl = state.expl
1650         self.default = state.default
1651         self.optional = state.optional
1652         self.offset = state.offset
1653         self.llen = state.llen
1654         self.vlen = state.vlen
1655         self.expl_lenindef = state.expl_lenindef
1656         self.lenindef = state.lenindef
1657         self.ber_encoded = state.ber_encoded
1658
1659     @property
1660     def tag_order(self):
1661         """Tag's (class, number) used for DER/CER sorting
1662         """
1663         return self._tag_order
1664
1665     @property
1666     def tag_order_cer(self):
1667         return self.tag_order
1668
1669     @property
1670     def tlen(self):
1671         """See :ref:`decoding`
1672         """
1673         return len(self.tag)
1674
1675     @property
1676     def tlvlen(self):
1677         """See :ref:`decoding`
1678         """
1679         return self.tlen + self.llen + self.vlen
1680
1681     def __str__(self):  # pragma: no cover
1682         return self.__bytes__() if PY2 else self.__unicode__()
1683
1684     def __ne__(self, their):
1685         return not(self == their)
1686
1687     def __gt__(self, their):  # pragma: no cover
1688         return not(self < their)
1689
1690     def __le__(self, their):  # pragma: no cover
1691         return (self == their) or (self < their)
1692
1693     def __ge__(self, their):  # pragma: no cover
1694         return (self == their) or (self > their)
1695
1696     def _encode(self):  # pragma: no cover
1697         raise NotImplementedError()
1698
1699     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):  # pragma: no cover
1700         yield NotImplemented
1701
1702     def encode(self):
1703         """DER encode the structure
1704
1705         :returns: DER representation
1706         """
1707         raw = self._encode()
1708         if self._expl is None:
1709             return raw
1710         return b"".join((self._expl, len_encode(len(raw)), raw))
1711
1712     def encode_cer(self, writer):
1713         """CER encode the structure to specified writer
1714
1715         :param writer: must comply with ``io.RawIOBase.write``
1716                        behaviour. It takes slice to be written and
1717                        returns number of bytes processed. If it returns
1718                        None, then exception will be raised
1719         """
1720         if self._expl is not None:
1721             write_full(writer, self._expl + LENINDEF)
1722         if getattr(self, "der_forced", False):
1723             write_full(writer, self._encode())
1724         else:
1725             self._encode_cer(writer)
1726         if self._expl is not None:
1727             write_full(writer, EOC)
1728
1729     def _encode_cer(self, writer):
1730         write_full(writer, self._encode())
1731
1732     def hexencode(self):
1733         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1734         """
1735         return hexenc(self.encode())
1736
1737     def decode(
1738             self,
1739             data,
1740             offset=0,
1741             leavemm=False,
1742             decode_path=(),
1743             ctx=None,
1744             tag_only=False,
1745             _ctx_immutable=True,
1746     ):
1747         """Decode the data
1748
1749         :param data: either binary or memoryview
1750         :param int offset: initial data's offset
1751         :param bool leavemm: do we need to leave memoryview of remaining
1752                     data as is, or convert it to bytes otherwise
1753         :param decode_path: current decode path (tuples of strings,
1754                             possibly with DecodePathDefBy) with will be
1755                             the root for all underlying objects
1756         :param ctx: optional :ref:`context <ctx>` governing decoding process
1757         :param bool tag_only: decode only the tag, without length and
1758                               contents (used only in Choice and Set
1759                               structures, trying to determine if tag satisfies
1760                               the schema)
1761         :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1762                                     before using it?
1763         :returns: (Obj, remaining data)
1764
1765         .. seealso:: :ref:`decoding`
1766         """
1767         result = next(self.decode_evgen(
1768             data,
1769             offset,
1770             leavemm,
1771             decode_path,
1772             ctx,
1773             tag_only,
1774             _ctx_immutable,
1775             _evgen_mode=False,
1776         ))
1777         if result is None:
1778             return None
1779         _, obj, tail = result
1780         return obj, tail
1781
1782     def decode_evgen(
1783             self,
1784             data,
1785             offset=0,
1786             leavemm=False,
1787             decode_path=(),
1788             ctx=None,
1789             tag_only=False,
1790             _ctx_immutable=True,
1791             _evgen_mode=True,
1792     ):
1793         """Decode with evgen mode on
1794
1795         That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1796         it returns the generator producing ``(decode_path, obj, tail)``
1797         values. See :ref:`evgen mode <evgen_mode>`.
1798         """
1799         if ctx is None:
1800             ctx = {}
1801         elif _ctx_immutable:
1802             ctx = copy(ctx)
1803         tlv = memoryview(data)
1804         if (
1805                 _evgen_mode and
1806                 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1807         ):
1808             _evgen_mode = False
1809         if self._expl is None:
1810             for result in self._decode(
1811                     tlv,
1812                     offset=offset,
1813                     decode_path=decode_path,
1814                     ctx=ctx,
1815                     tag_only=tag_only,
1816                     evgen_mode=_evgen_mode,
1817             ):
1818                 if tag_only:
1819                     yield None
1820                     return
1821                 _decode_path, obj, tail = result
1822                 if not _decode_path is decode_path:
1823                     yield result
1824         else:
1825             try:
1826                 t, tlen, lv = tag_strip(tlv)
1827             except DecodeError as err:
1828                 raise err.__class__(
1829                     msg=err.msg,
1830                     klass=self.__class__,
1831                     decode_path=decode_path,
1832                     offset=offset,
1833                 )
1834             if t != self._expl:
1835                 raise TagMismatch(
1836                     klass=self.__class__,
1837                     decode_path=decode_path,
1838                     offset=offset,
1839                 )
1840             try:
1841                 l, llen, v = len_decode(lv)
1842             except LenIndefForm as err:
1843                 if not ctx.get("bered", False):
1844                     raise err.__class__(
1845                         msg=err.msg,
1846                         klass=self.__class__,
1847                         decode_path=decode_path,
1848                         offset=offset,
1849                     )
1850                 llen, v = 1, lv[1:]
1851                 offset += tlen + llen
1852                 for result in self._decode(
1853                         v,
1854                         offset=offset,
1855                         decode_path=decode_path,
1856                         ctx=ctx,
1857                         tag_only=tag_only,
1858                         evgen_mode=_evgen_mode,
1859                 ):
1860                     if tag_only:  # pragma: no cover
1861                         yield None
1862                         return
1863                     _decode_path, obj, tail = result
1864                     if not _decode_path is decode_path:
1865                         yield result
1866                 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1867                 if eoc_expected.tobytes() != EOC:
1868                     raise DecodeError(
1869                         "no EOC",
1870                         klass=self.__class__,
1871                         decode_path=decode_path,
1872                         offset=offset,
1873                     )
1874                 obj.vlen += EOC_LEN
1875                 obj.expl_lenindef = True
1876             except DecodeError as err:
1877                 raise err.__class__(
1878                     msg=err.msg,
1879                     klass=self.__class__,
1880                     decode_path=decode_path,
1881                     offset=offset,
1882                 )
1883             else:
1884                 if l > len(v):
1885                     raise NotEnoughData(
1886                         "encoded length is longer than data",
1887                         klass=self.__class__,
1888                         decode_path=decode_path,
1889                         offset=offset,
1890                     )
1891                 for result in self._decode(
1892                         v,
1893                         offset=offset + tlen + llen,
1894                         decode_path=decode_path,
1895                         ctx=ctx,
1896                         tag_only=tag_only,
1897                         evgen_mode=_evgen_mode,
1898                 ):
1899                     if tag_only:  # pragma: no cover
1900                         yield None
1901                         return
1902                     _decode_path, obj, tail = result
1903                     if not _decode_path is decode_path:
1904                         yield result
1905                 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
1906                     raise DecodeError(
1907                         "explicit tag out-of-bound, longer than data",
1908                         klass=self.__class__,
1909                         decode_path=decode_path,
1910                         offset=offset,
1911                     )
1912         yield decode_path, obj, (tail if leavemm else tail.tobytes())
1913
1914     def decod(self, data, offset=0, decode_path=(), ctx=None):
1915         """Decode the data, check that tail is empty
1916
1917         :raises ExceedingData: if tail is not empty
1918
1919         This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
1920         (decode without tail) that also checks that there is no
1921         trailing data left.
1922         """
1923         obj, tail = self.decode(
1924             data,
1925             offset=offset,
1926             decode_path=decode_path,
1927             ctx=ctx,
1928             leavemm=True,
1929         )
1930         if len(tail) > 0:
1931             raise ExceedingData(len(tail))
1932         return obj
1933
1934     def hexdecode(self, data, *args, **kwargs):
1935         """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
1936         """
1937         return self.decode(hexdec(data), *args, **kwargs)
1938
1939     def hexdecod(self, data, *args, **kwargs):
1940         """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
1941         """
1942         return self.decod(hexdec(data), *args, **kwargs)
1943
1944     @property
1945     def expled(self):
1946         """See :ref:`decoding`
1947         """
1948         return self._expl is not None
1949
1950     @property
1951     def expl_tag(self):
1952         """See :ref:`decoding`
1953         """
1954         return self._expl
1955
1956     @property
1957     def expl_tlen(self):
1958         """See :ref:`decoding`
1959         """
1960         return len(self._expl)
1961
1962     @property
1963     def expl_llen(self):
1964         """See :ref:`decoding`
1965         """
1966         if self.expl_lenindef:
1967             return 1
1968         return len(len_encode(self.tlvlen))
1969
1970     @property
1971     def expl_offset(self):
1972         """See :ref:`decoding`
1973         """
1974         return self.offset - self.expl_tlen - self.expl_llen
1975
1976     @property
1977     def expl_vlen(self):
1978         """See :ref:`decoding`
1979         """
1980         return self.tlvlen
1981
1982     @property
1983     def expl_tlvlen(self):
1984         """See :ref:`decoding`
1985         """
1986         return self.expl_tlen + self.expl_llen + self.expl_vlen
1987
1988     @property
1989     def fulloffset(self):
1990         """See :ref:`decoding`
1991         """
1992         return self.expl_offset if self.expled else self.offset
1993
1994     @property
1995     def fulllen(self):
1996         """See :ref:`decoding`
1997         """
1998         return self.expl_tlvlen if self.expled else self.tlvlen
1999
2000     def pps_lenindef(self, decode_path):
2001         if self.lenindef and not (
2002                 getattr(self, "defined", None) is not None and
2003                 self.defined[1].lenindef
2004         ):
2005             yield _pp(
2006                 asn1_type_name="EOC",
2007                 obj_name="",
2008                 decode_path=decode_path,
2009                 offset=(
2010                     self.offset + self.tlvlen -
2011                     (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2012                 ),
2013                 tlen=1,
2014                 llen=1,
2015                 vlen=0,
2016                 ber_encoded=True,
2017                 bered=True,
2018             )
2019         if self.expl_lenindef:
2020             yield _pp(
2021                 asn1_type_name="EOC",
2022                 obj_name="EXPLICIT",
2023                 decode_path=decode_path,
2024                 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2025                 tlen=1,
2026                 llen=1,
2027                 vlen=0,
2028                 ber_encoded=True,
2029                 bered=True,
2030             )
2031
2032
2033 def encode_cer(obj):
2034     """Encode to CER in memory buffer
2035
2036     :returns bytes: memory buffer contents
2037     """
2038     buf = BytesIO()
2039     obj.encode_cer(buf.write)
2040     return buf.getvalue()
2041
2042
2043 class DecodePathDefBy(object):
2044     """DEFINED BY representation inside decode path
2045     """
2046     __slots__ = ("defined_by",)
2047
2048     def __init__(self, defined_by):
2049         self.defined_by = defined_by
2050
2051     def __ne__(self, their):
2052         return not(self == their)
2053
2054     def __eq__(self, their):
2055         if not isinstance(their, self.__class__):
2056             return False
2057         return self.defined_by == their.defined_by
2058
2059     def __str__(self):
2060         return "DEFINED BY " + str(self.defined_by)
2061
2062     def __repr__(self):
2063         return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2064
2065
2066 ########################################################################
2067 # Pretty printing
2068 ########################################################################
2069
2070 PP = namedtuple("PP", (
2071     "obj",
2072     "asn1_type_name",
2073     "obj_name",
2074     "decode_path",
2075     "value",
2076     "blob",
2077     "optional",
2078     "default",
2079     "impl",
2080     "expl",
2081     "offset",
2082     "tlen",
2083     "llen",
2084     "vlen",
2085     "expl_offset",
2086     "expl_tlen",
2087     "expl_llen",
2088     "expl_vlen",
2089     "expl_lenindef",
2090     "lenindef",
2091     "ber_encoded",
2092     "bered",
2093 ), **NAMEDTUPLE_KWARGS)
2094
2095
2096 def _pp(
2097         obj=None,
2098         asn1_type_name="unknown",
2099         obj_name="unknown",
2100         decode_path=(),
2101         value=None,
2102         blob=None,
2103         optional=False,
2104         default=False,
2105         impl=None,
2106         expl=None,
2107         offset=0,
2108         tlen=0,
2109         llen=0,
2110         vlen=0,
2111         expl_offset=None,
2112         expl_tlen=None,
2113         expl_llen=None,
2114         expl_vlen=None,
2115         expl_lenindef=False,
2116         lenindef=False,
2117         ber_encoded=False,
2118         bered=False,
2119 ):
2120     return PP(
2121         obj,
2122         asn1_type_name,
2123         obj_name,
2124         decode_path,
2125         value,
2126         blob,
2127         optional,
2128         default,
2129         impl,
2130         expl,
2131         offset,
2132         tlen,
2133         llen,
2134         vlen,
2135         expl_offset,
2136         expl_tlen,
2137         expl_llen,
2138         expl_vlen,
2139         expl_lenindef,
2140         lenindef,
2141         ber_encoded,
2142         bered,
2143     )
2144
2145
2146 def _colourize(what, colour, with_colours, attrs=("bold",)):
2147     return colored(what, colour, attrs=attrs) if with_colours else what
2148
2149
2150 def colonize_hex(hexed):
2151     """Separate hexadecimal string with colons
2152     """
2153     return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2154
2155
2156 def pp_console_row(
2157         pp,
2158         oid_maps=(),
2159         with_offsets=False,
2160         with_blob=True,
2161         with_colours=False,
2162         with_decode_path=False,
2163         decode_path_len_decrease=0,
2164 ):
2165     cols = []
2166     if with_offsets:
2167         col = "%5d%s%s" % (
2168             pp.offset,
2169             (
2170                 "  " if pp.expl_offset is None else
2171                 ("-%d" % (pp.offset - pp.expl_offset))
2172             ),
2173             LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2174         )
2175         col = _colourize(col, "red", with_colours, ())
2176         col += _colourize("B", "red", with_colours) if pp.bered else " "
2177         cols.append(col)
2178         col = "[%d,%d,%4d]%s" % (
2179             pp.tlen,
2180             pp.llen,
2181             pp.vlen,
2182             LENINDEF_PP_CHAR if pp.lenindef else " "
2183         )
2184         col = _colourize(col, "green", with_colours, ())
2185         cols.append(col)
2186     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2187     if decode_path_len > 0:
2188         cols.append(" ." * decode_path_len)
2189         ent = pp.decode_path[-1]
2190         if isinstance(ent, DecodePathDefBy):
2191             cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2192             value = str(ent.defined_by)
2193             oid_name = None
2194             if (
2195                     len(oid_maps) > 0 and
2196                     ent.defined_by.asn1_type_name ==
2197                     ObjectIdentifier.asn1_type_name
2198             ):
2199                 for oid_map in oid_maps:
2200                     oid_name = oid_map.get(value)
2201                     if oid_name is not None:
2202                         cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2203                         break
2204             if oid_name is None:
2205                 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2206         else:
2207             cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2208     if pp.expl is not None:
2209         klass, _, num = pp.expl
2210         col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2211         cols.append(_colourize(col, "blue", with_colours))
2212     if pp.impl is not None:
2213         klass, _, num = pp.impl
2214         col = "[%s%d]" % (TagClassReprs[klass], num)
2215         cols.append(_colourize(col, "blue", with_colours))
2216     if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2217         cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2218     if pp.ber_encoded:
2219         cols.append(_colourize("BER", "red", with_colours))
2220     cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2221     if pp.value is not None:
2222         value = pp.value
2223         cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2224         if (
2225                 len(oid_maps) > 0 and
2226                 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2227         ):
2228             for oid_map in oid_maps:
2229                 oid_name = oid_map.get(value)
2230                 if oid_name is not None:
2231                     cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2232                     break
2233         if pp.asn1_type_name == Integer.asn1_type_name:
2234             hex_repr = hex(int(pp.obj._value))[2:].upper()
2235             if len(hex_repr) % 2 != 0:
2236                 hex_repr = "0" + hex_repr
2237             cols.append(_colourize(
2238                 "(%s)" % colonize_hex(hex_repr),
2239                 "green",
2240                 with_colours,
2241             ))
2242     if with_blob:
2243         if pp.blob.__class__ == binary_type:
2244             cols.append(hexenc(pp.blob))
2245         elif pp.blob.__class__ == tuple:
2246             cols.append(", ".join(pp.blob))
2247     if pp.optional:
2248         cols.append(_colourize("OPTIONAL", "red", with_colours))
2249     if pp.default:
2250         cols.append(_colourize("DEFAULT", "red", with_colours))
2251     if with_decode_path:
2252         cols.append(_colourize(
2253             "[%s]" % ":".join(str(p) for p in pp.decode_path),
2254             "grey",
2255             with_colours,
2256         ))
2257     return " ".join(cols)
2258
2259
2260 def pp_console_blob(pp, decode_path_len_decrease=0):
2261     cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2262     decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2263     if decode_path_len > 0:
2264         cols.append(" ." * (decode_path_len + 1))
2265     if pp.blob.__class__ == binary_type:
2266         blob = hexenc(pp.blob).upper()
2267         for i in six_xrange(0, len(blob), 32):
2268             chunk = blob[i:i + 32]
2269             yield " ".join(cols + [colonize_hex(chunk)])
2270     elif pp.blob.__class__ == tuple:
2271         yield " ".join(cols + [", ".join(pp.blob)])
2272
2273
2274 def pprint(
2275         obj,
2276         oid_maps=(),
2277         big_blobs=False,
2278         with_colours=False,
2279         with_decode_path=False,
2280         decode_path_only=(),
2281         decode_path=(),
2282 ):
2283     """Pretty print object
2284
2285     :param Obj obj: object you want to pretty print
2286     :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2287                      Its human readable form is printed when OID is met
2288     :param big_blobs: if large binary objects are met (like OctetString
2289                       values), do we need to print them too, on separate
2290                       lines
2291     :param with_colours: colourize output, if ``termcolor`` library
2292                          is available
2293     :param with_decode_path: print decode path
2294     :param decode_path_only: print only that specified decode path
2295     """
2296     def _pprint_pps(pps):
2297         for pp in pps:
2298             if hasattr(pp, "_fields"):
2299                 if (
2300                         decode_path_only != () and
2301                         tuple(
2302                             str(p) for p in pp.decode_path[:len(decode_path_only)]
2303                         ) != decode_path_only
2304                 ):
2305                     continue
2306                 if big_blobs:
2307                     yield pp_console_row(
2308                         pp,
2309                         oid_maps=oid_maps,
2310                         with_offsets=True,
2311                         with_blob=False,
2312                         with_colours=with_colours,
2313                         with_decode_path=with_decode_path,
2314                         decode_path_len_decrease=len(decode_path_only),
2315                     )
2316                     for row in pp_console_blob(
2317                             pp,
2318                             decode_path_len_decrease=len(decode_path_only),
2319                     ):
2320                         yield row
2321                 else:
2322                     yield pp_console_row(
2323                         pp,
2324                         oid_maps=oid_maps,
2325                         with_offsets=True,
2326                         with_blob=True,
2327                         with_colours=with_colours,
2328                         with_decode_path=with_decode_path,
2329                         decode_path_len_decrease=len(decode_path_only),
2330                     )
2331             else:
2332                 for row in _pprint_pps(pp):
2333                     yield row
2334     return "\n".join(_pprint_pps(obj.pps(decode_path)))
2335
2336
2337 ########################################################################
2338 # ASN.1 primitive types
2339 ########################################################################
2340
2341 BooleanState = namedtuple(
2342     "BooleanState",
2343     BasicState._fields + ("value",),
2344     **NAMEDTUPLE_KWARGS
2345 )
2346
2347
2348 class Boolean(Obj):
2349     """``BOOLEAN`` boolean type
2350
2351     >>> b = Boolean(True)
2352     BOOLEAN True
2353     >>> b == Boolean(True)
2354     True
2355     >>> bool(b)
2356     True
2357     """
2358     __slots__ = ()
2359     tag_default = tag_encode(1)
2360     asn1_type_name = "BOOLEAN"
2361
2362     def __init__(
2363             self,
2364             value=None,
2365             impl=None,
2366             expl=None,
2367             default=None,
2368             optional=False,
2369             _decoded=(0, 0, 0),
2370     ):
2371         """
2372         :param value: set the value. Either boolean type, or
2373                       :py:class:`pyderasn.Boolean` object
2374         :param bytes impl: override default tag with ``IMPLICIT`` one
2375         :param bytes expl: override default tag with ``EXPLICIT`` one
2376         :param default: set default value. Type same as in ``value``
2377         :param bool optional: is object ``OPTIONAL`` in sequence
2378         """
2379         super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2380         self._value = None if value is None else self._value_sanitize(value)
2381         if default is not None:
2382             default = self._value_sanitize(default)
2383             self.default = self.__class__(
2384                 value=default,
2385                 impl=self.tag,
2386                 expl=self._expl,
2387             )
2388             if value is None:
2389                 self._value = default
2390
2391     def _value_sanitize(self, value):
2392         if value.__class__ == bool:
2393             return value
2394         if issubclass(value.__class__, Boolean):
2395             return value._value
2396         raise InvalidValueType((self.__class__, bool))
2397
2398     @property
2399     def ready(self):
2400         return self._value is not None
2401
2402     def __getstate__(self):
2403         return BooleanState(
2404             __version__,
2405             self.tag,
2406             self._tag_order,
2407             self._expl,
2408             self.default,
2409             self.optional,
2410             self.offset,
2411             self.llen,
2412             self.vlen,
2413             self.expl_lenindef,
2414             self.lenindef,
2415             self.ber_encoded,
2416             self._value,
2417         )
2418
2419     def __setstate__(self, state):
2420         super(Boolean, self).__setstate__(state)
2421         self._value = state.value
2422
2423     def __nonzero__(self):
2424         self._assert_ready()
2425         return self._value
2426
2427     def __bool__(self):
2428         self._assert_ready()
2429         return self._value
2430
2431     def __eq__(self, their):
2432         if their.__class__ == bool:
2433             return self._value == their
2434         if not issubclass(their.__class__, Boolean):
2435             return False
2436         return (
2437             self._value == their._value and
2438             self.tag == their.tag and
2439             self._expl == their._expl
2440         )
2441
2442     def __call__(
2443             self,
2444             value=None,
2445             impl=None,
2446             expl=None,
2447             default=None,
2448             optional=None,
2449     ):
2450         return self.__class__(
2451             value=value,
2452             impl=self.tag if impl is None else impl,
2453             expl=self._expl if expl is None else expl,
2454             default=self.default if default is None else default,
2455             optional=self.optional if optional is None else optional,
2456         )
2457
2458     def _encode(self):
2459         self._assert_ready()
2460         return b"".join((
2461             self.tag,
2462             len_encode(1),
2463             (b"\xFF" if self._value else b"\x00"),
2464         ))
2465
2466     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2467         try:
2468             t, _, lv = tag_strip(tlv)
2469         except DecodeError as err:
2470             raise err.__class__(
2471                 msg=err.msg,
2472                 klass=self.__class__,
2473                 decode_path=decode_path,
2474                 offset=offset,
2475             )
2476         if t != self.tag:
2477             raise TagMismatch(
2478                 klass=self.__class__,
2479                 decode_path=decode_path,
2480                 offset=offset,
2481             )
2482         if tag_only:
2483             yield None
2484             return
2485         try:
2486             l, _, v = len_decode(lv)
2487         except DecodeError as err:
2488             raise err.__class__(
2489                 msg=err.msg,
2490                 klass=self.__class__,
2491                 decode_path=decode_path,
2492                 offset=offset,
2493             )
2494         if l != 1:
2495             raise InvalidLength(
2496                 "Boolean's length must be equal to 1",
2497                 klass=self.__class__,
2498                 decode_path=decode_path,
2499                 offset=offset,
2500             )
2501         if l > len(v):
2502             raise NotEnoughData(
2503                 "encoded length is longer than data",
2504                 klass=self.__class__,
2505                 decode_path=decode_path,
2506                 offset=offset,
2507             )
2508         first_octet = byte2int(v)
2509         ber_encoded = False
2510         if first_octet == 0:
2511             value = False
2512         elif first_octet == 0xFF:
2513             value = True
2514         elif ctx.get("bered", False):
2515             value = True
2516             ber_encoded = True
2517         else:
2518             raise DecodeError(
2519                 "unacceptable Boolean value",
2520                 klass=self.__class__,
2521                 decode_path=decode_path,
2522                 offset=offset,
2523             )
2524         obj = self.__class__(
2525             value=value,
2526             impl=self.tag,
2527             expl=self._expl,
2528             default=self.default,
2529             optional=self.optional,
2530             _decoded=(offset, 1, 1),
2531         )
2532         obj.ber_encoded = ber_encoded
2533         yield decode_path, obj, v[1:]
2534
2535     def __repr__(self):
2536         return pp_console_row(next(self.pps()))
2537
2538     def pps(self, decode_path=()):
2539         yield _pp(
2540             obj=self,
2541             asn1_type_name=self.asn1_type_name,
2542             obj_name=self.__class__.__name__,
2543             decode_path=decode_path,
2544             value=str(self._value) if self.ready else None,
2545             optional=self.optional,
2546             default=self == self.default,
2547             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2548             expl=None if self._expl is None else tag_decode(self._expl),
2549             offset=self.offset,
2550             tlen=self.tlen,
2551             llen=self.llen,
2552             vlen=self.vlen,
2553             expl_offset=self.expl_offset if self.expled else None,
2554             expl_tlen=self.expl_tlen if self.expled else None,
2555             expl_llen=self.expl_llen if self.expled else None,
2556             expl_vlen=self.expl_vlen if self.expled else None,
2557             expl_lenindef=self.expl_lenindef,
2558             ber_encoded=self.ber_encoded,
2559             bered=self.bered,
2560         )
2561         for pp in self.pps_lenindef(decode_path):
2562             yield pp
2563
2564
2565 IntegerState = namedtuple(
2566     "IntegerState",
2567     BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2568     **NAMEDTUPLE_KWARGS
2569 )
2570
2571
2572 class Integer(Obj):
2573     """``INTEGER`` integer type
2574
2575     >>> b = Integer(-123)
2576     INTEGER -123
2577     >>> b == Integer(-123)
2578     True
2579     >>> int(b)
2580     -123
2581
2582     >>> Integer(2, bounds=(1, 3))
2583     INTEGER 2
2584     >>> Integer(5, bounds=(1, 3))
2585     Traceback (most recent call last):
2586     pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2587
2588     ::
2589
2590         class Version(Integer):
2591             schema = (
2592                 ("v1", 0),
2593                 ("v2", 1),
2594                 ("v3", 2),
2595             )
2596
2597     >>> v = Version("v1")
2598     Version INTEGER v1
2599     >>> int(v)
2600     0
2601     >>> v.named
2602     'v1'
2603     >>> v.specs
2604     {'v3': 2, 'v1': 0, 'v2': 1}
2605     """
2606     __slots__ = ("specs", "_bound_min", "_bound_max")
2607     tag_default = tag_encode(2)
2608     asn1_type_name = "INTEGER"
2609
2610     def __init__(
2611             self,
2612             value=None,
2613             bounds=None,
2614             impl=None,
2615             expl=None,
2616             default=None,
2617             optional=False,
2618             _specs=None,
2619             _decoded=(0, 0, 0),
2620     ):
2621         """
2622         :param value: set the value. Either integer type, named value
2623                       (if ``schema`` is specified in the class), or
2624                       :py:class:`pyderasn.Integer` object
2625         :param bounds: set ``(MIN, MAX)`` value constraint.
2626                        (-inf, +inf) by default
2627         :param bytes impl: override default tag with ``IMPLICIT`` one
2628         :param bytes expl: override default tag with ``EXPLICIT`` one
2629         :param default: set default value. Type same as in ``value``
2630         :param bool optional: is object ``OPTIONAL`` in sequence
2631         """
2632         super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2633         self._value = value
2634         specs = getattr(self, "schema", {}) if _specs is None else _specs
2635         self.specs = specs if specs.__class__ == dict else dict(specs)
2636         self._bound_min, self._bound_max = getattr(
2637             self,
2638             "bounds",
2639             (float("-inf"), float("+inf")),
2640         ) if bounds is None else bounds
2641         if value is not None:
2642             self._value = self._value_sanitize(value)
2643         if default is not None:
2644             default = self._value_sanitize(default)
2645             self.default = self.__class__(
2646                 value=default,
2647                 impl=self.tag,
2648                 expl=self._expl,
2649                 _specs=self.specs,
2650             )
2651             if self._value is None:
2652                 self._value = default
2653
2654     def _value_sanitize(self, value):
2655         if isinstance(value, integer_types):
2656             pass
2657         elif issubclass(value.__class__, Integer):
2658             value = value._value
2659         elif value.__class__ == str:
2660             value = self.specs.get(value)
2661             if value is None:
2662                 raise ObjUnknown("integer value: %s" % value)
2663         else:
2664             raise InvalidValueType((self.__class__, int, str))
2665         if not self._bound_min <= value <= self._bound_max:
2666             raise BoundsError(self._bound_min, value, self._bound_max)
2667         return value
2668
2669     @property
2670     def ready(self):
2671         return self._value is not None
2672
2673     def __getstate__(self):
2674         return IntegerState(
2675             __version__,
2676             self.tag,
2677             self._tag_order,
2678             self._expl,
2679             self.default,
2680             self.optional,
2681             self.offset,
2682             self.llen,
2683             self.vlen,
2684             self.expl_lenindef,
2685             self.lenindef,
2686             self.ber_encoded,
2687             self.specs,
2688             self._value,
2689             self._bound_min,
2690             self._bound_max,
2691         )
2692
2693     def __setstate__(self, state):
2694         super(Integer, self).__setstate__(state)
2695         self.specs = state.specs
2696         self._value = state.value
2697         self._bound_min = state.bound_min
2698         self._bound_max = state.bound_max
2699
2700     def __int__(self):
2701         self._assert_ready()
2702         return int(self._value)
2703
2704     def __hash__(self):
2705         self._assert_ready()
2706         return hash(
2707             self.tag +
2708             bytes(self._expl or b"") +
2709             str(self._value).encode("ascii"),
2710         )
2711
2712     def __eq__(self, their):
2713         if isinstance(their, integer_types):
2714             return self._value == their
2715         if not issubclass(their.__class__, Integer):
2716             return False
2717         return (
2718             self._value == their._value and
2719             self.tag == their.tag and
2720             self._expl == their._expl
2721         )
2722
2723     def __lt__(self, their):
2724         return self._value < their._value
2725
2726     @property
2727     def named(self):
2728         """Return named representation (if exists) of the value
2729         """
2730         for name, value in iteritems(self.specs):
2731             if value == self._value:
2732                 return name
2733         return None
2734
2735     def __call__(
2736             self,
2737             value=None,
2738             bounds=None,
2739             impl=None,
2740             expl=None,
2741             default=None,
2742             optional=None,
2743     ):
2744         return self.__class__(
2745             value=value,
2746             bounds=(
2747                 (self._bound_min, self._bound_max)
2748                 if bounds is None else bounds
2749             ),
2750             impl=self.tag if impl is None else impl,
2751             expl=self._expl if expl is None else expl,
2752             default=self.default if default is None else default,
2753             optional=self.optional if optional is None else optional,
2754             _specs=self.specs,
2755         )
2756
2757     def _encode(self):
2758         self._assert_ready()
2759         value = self._value
2760         if PY2:
2761             if value == 0:
2762                 octets = bytearray([0])
2763             elif value < 0:
2764                 value = -value
2765                 value -= 1
2766                 octets = bytearray()
2767                 while value > 0:
2768                     octets.append((value & 0xFF) ^ 0xFF)
2769                     value >>= 8
2770                 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2771                     octets.append(0xFF)
2772             else:
2773                 octets = bytearray()
2774                 while value > 0:
2775                     octets.append(value & 0xFF)
2776                     value >>= 8
2777                 if octets[-1] & 0x80 > 0:
2778                     octets.append(0x00)
2779             octets.reverse()
2780             octets = bytes(octets)
2781         else:
2782             bytes_len = ceil(value.bit_length() / 8) or 1
2783             while True:
2784                 try:
2785                     octets = value.to_bytes(
2786                         bytes_len,
2787                         byteorder="big",
2788                         signed=True,
2789                     )
2790                 except OverflowError:
2791                     bytes_len += 1
2792                 else:
2793                     break
2794         return b"".join((self.tag, len_encode(len(octets)), octets))
2795
2796     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2797         try:
2798             t, _, lv = tag_strip(tlv)
2799         except DecodeError as err:
2800             raise err.__class__(
2801                 msg=err.msg,
2802                 klass=self.__class__,
2803                 decode_path=decode_path,
2804                 offset=offset,
2805             )
2806         if t != self.tag:
2807             raise TagMismatch(
2808                 klass=self.__class__,
2809                 decode_path=decode_path,
2810                 offset=offset,
2811             )
2812         if tag_only:
2813             yield None
2814             return
2815         try:
2816             l, llen, v = len_decode(lv)
2817         except DecodeError as err:
2818             raise err.__class__(
2819                 msg=err.msg,
2820                 klass=self.__class__,
2821                 decode_path=decode_path,
2822                 offset=offset,
2823             )
2824         if l > len(v):
2825             raise NotEnoughData(
2826                 "encoded length is longer than data",
2827                 klass=self.__class__,
2828                 decode_path=decode_path,
2829                 offset=offset,
2830             )
2831         if l == 0:
2832             raise NotEnoughData(
2833                 "zero length",
2834                 klass=self.__class__,
2835                 decode_path=decode_path,
2836                 offset=offset,
2837             )
2838         v, tail = v[:l], v[l:]
2839         first_octet = byte2int(v)
2840         if l > 1:
2841             second_octet = byte2int(v[1:])
2842             if (
2843                     ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2844                     ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2845             ):
2846                 raise DecodeError(
2847                     "non normalized integer",
2848                     klass=self.__class__,
2849                     decode_path=decode_path,
2850                     offset=offset,
2851                 )
2852         if PY2:
2853             value = 0
2854             if first_octet & 0x80 > 0:
2855                 octets = bytearray()
2856                 for octet in bytearray(v):
2857                     octets.append(octet ^ 0xFF)
2858                 for octet in octets:
2859                     value = (value << 8) | octet
2860                 value += 1
2861                 value = -value
2862             else:
2863                 for octet in bytearray(v):
2864                     value = (value << 8) | octet
2865         else:
2866             value = int.from_bytes(v, byteorder="big", signed=True)
2867         try:
2868             obj = self.__class__(
2869                 value=value,
2870                 bounds=(self._bound_min, self._bound_max),
2871                 impl=self.tag,
2872                 expl=self._expl,
2873                 default=self.default,
2874                 optional=self.optional,
2875                 _specs=self.specs,
2876                 _decoded=(offset, llen, l),
2877             )
2878         except BoundsError as err:
2879             raise DecodeError(
2880                 msg=str(err),
2881                 klass=self.__class__,
2882                 decode_path=decode_path,
2883                 offset=offset,
2884             )
2885         yield decode_path, obj, tail
2886
2887     def __repr__(self):
2888         return pp_console_row(next(self.pps()))
2889
2890     def pps(self, decode_path=()):
2891         yield _pp(
2892             obj=self,
2893             asn1_type_name=self.asn1_type_name,
2894             obj_name=self.__class__.__name__,
2895             decode_path=decode_path,
2896             value=(self.named or str(self._value)) if self.ready else None,
2897             optional=self.optional,
2898             default=self == self.default,
2899             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2900             expl=None if self._expl is None else tag_decode(self._expl),
2901             offset=self.offset,
2902             tlen=self.tlen,
2903             llen=self.llen,
2904             vlen=self.vlen,
2905             expl_offset=self.expl_offset if self.expled else None,
2906             expl_tlen=self.expl_tlen if self.expled else None,
2907             expl_llen=self.expl_llen if self.expled else None,
2908             expl_vlen=self.expl_vlen if self.expled else None,
2909             expl_lenindef=self.expl_lenindef,
2910             bered=self.bered,
2911         )
2912         for pp in self.pps_lenindef(decode_path):
2913             yield pp
2914
2915
2916 BitStringState = namedtuple(
2917     "BitStringState",
2918     BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
2919     **NAMEDTUPLE_KWARGS
2920 )
2921
2922
2923 class BitString(Obj):
2924     """``BIT STRING`` bit string type
2925
2926     >>> BitString(b"hello world")
2927     BIT STRING 88 bits 68656c6c6f20776f726c64
2928     >>> bytes(b)
2929     b'hello world'
2930     >>> b == b"hello world"
2931     True
2932     >>> b.bit_len
2933     88
2934
2935     >>> BitString("'0A3B5F291CD'H")
2936     BIT STRING 44 bits 0a3b5f291cd0
2937     >>> b = BitString("'010110000000'B")
2938     BIT STRING 12 bits 5800
2939     >>> b.bit_len
2940     12
2941     >>> b[0], b[1], b[2], b[3]
2942     (False, True, False, True)
2943     >>> b[1000]
2944     False
2945     >>> [v for v in b]
2946     [False, True, False, True, True, False, False, False, False, False, False, False]
2947
2948     ::
2949
2950         class KeyUsage(BitString):
2951             schema = (
2952                 ("digitalSignature", 0),
2953                 ("nonRepudiation", 1),
2954                 ("keyEncipherment", 2),
2955             )
2956
2957     >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
2958     KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
2959     >>> b.named
2960     ['nonRepudiation', 'keyEncipherment']
2961     >>> b.specs
2962     {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
2963
2964     .. note::
2965
2966        Pay attention that BIT STRING can be encoded both in primitive
2967        and constructed forms. Decoder always checks constructed form tag
2968        additionally to specified primitive one. If BER decoding is
2969        :ref:`not enabled <bered_ctx>`, then decoder will fail, because
2970        of DER restrictions.
2971     """
2972     __slots__ = ("tag_constructed", "specs", "defined")
2973     tag_default = tag_encode(3)
2974     asn1_type_name = "BIT STRING"
2975
2976     def __init__(
2977             self,
2978             value=None,
2979             impl=None,
2980             expl=None,
2981             default=None,
2982             optional=False,
2983             _specs=None,
2984             _decoded=(0, 0, 0),
2985     ):
2986         """
2987         :param value: set the value. Either binary type, tuple of named
2988                       values (if ``schema`` is specified in the class),
2989                       string in ``'XXX...'B`` form, or
2990                       :py:class:`pyderasn.BitString` object
2991         :param bytes impl: override default tag with ``IMPLICIT`` one
2992         :param bytes expl: override default tag with ``EXPLICIT`` one
2993         :param default: set default value. Type same as in ``value``
2994         :param bool optional: is object ``OPTIONAL`` in sequence
2995         """
2996         super(BitString, self).__init__(impl, expl, default, optional, _decoded)
2997         specs = getattr(self, "schema", {}) if _specs is None else _specs
2998         self.specs = specs if specs.__class__ == dict else dict(specs)
2999         self._value = None if value is None else self._value_sanitize(value)
3000         if default is not None:
3001             default = self._value_sanitize(default)
3002             self.default = self.__class__(
3003                 value=default,
3004                 impl=self.tag,
3005                 expl=self._expl,
3006             )
3007             if value is None:
3008                 self._value = default
3009         self.defined = None
3010         tag_klass, _, tag_num = tag_decode(self.tag)
3011         self.tag_constructed = tag_encode(
3012             klass=tag_klass,
3013             form=TagFormConstructed,
3014             num=tag_num,
3015         )
3016
3017     def _bits2octets(self, bits):
3018         if len(self.specs) > 0:
3019             bits = bits.rstrip("0")
3020         bit_len = len(bits)
3021         bits += "0" * ((8 - (bit_len % 8)) % 8)
3022         octets = bytearray(len(bits) // 8)
3023         for i in six_xrange(len(octets)):
3024             octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3025         return bit_len, bytes(octets)
3026
3027     def _value_sanitize(self, value):
3028         if isinstance(value, (string_types, binary_type)):
3029             if (
3030                     isinstance(value, string_types) and
3031                     value.startswith("'")
3032             ):
3033                 if value.endswith("'B"):
3034                     value = value[1:-2]
3035                     if not frozenset(value) <= SET01:
3036                         raise ValueError("B's coding contains unacceptable chars")
3037                     return self._bits2octets(value)
3038                 if value.endswith("'H"):
3039                     value = value[1:-2]
3040                     return (
3041                         len(value) * 4,
3042                         hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3043                     )
3044             if value.__class__ == binary_type:
3045                 return (len(value) * 8, value)
3046             raise InvalidValueType((self.__class__, string_types, binary_type))
3047         if value.__class__ == tuple:
3048             if (
3049                     len(value) == 2 and
3050                     isinstance(value[0], integer_types) and
3051                     value[1].__class__ == binary_type
3052             ):
3053                 return value
3054             bits = []
3055             for name in value:
3056                 bit = self.specs.get(name)
3057                 if bit is None:
3058                     raise ObjUnknown("BitString value: %s" % name)
3059                 bits.append(bit)
3060             if len(bits) == 0:
3061                 return self._bits2octets("")
3062             bits = frozenset(bits)
3063             return self._bits2octets("".join(
3064                 ("1" if bit in bits else "0")
3065                 for bit in six_xrange(max(bits) + 1)
3066             ))
3067         if issubclass(value.__class__, BitString):
3068             return value._value
3069         raise InvalidValueType((self.__class__, binary_type, string_types))
3070
3071     @property
3072     def ready(self):
3073         return self._value is not None
3074
3075     def __getstate__(self):
3076         return BitStringState(
3077             __version__,
3078             self.tag,
3079             self._tag_order,
3080             self._expl,
3081             self.default,
3082             self.optional,
3083             self.offset,
3084             self.llen,
3085             self.vlen,
3086             self.expl_lenindef,
3087             self.lenindef,
3088             self.ber_encoded,
3089             self.specs,
3090             self._value,
3091             self.tag_constructed,
3092             self.defined,
3093         )
3094
3095     def __setstate__(self, state):
3096         super(BitString, self).__setstate__(state)
3097         self.specs = state.specs
3098         self._value = state.value
3099         self.tag_constructed = state.tag_constructed
3100         self.defined = state.defined
3101
3102     def __iter__(self):
3103         self._assert_ready()
3104         for i in six_xrange(self._value[0]):
3105             yield self[i]
3106
3107     @property
3108     def bit_len(self):
3109         """Returns number of bits in the string
3110         """
3111         self._assert_ready()
3112         return self._value[0]
3113
3114     def __bytes__(self):
3115         self._assert_ready()
3116         return self._value[1]
3117
3118     def __eq__(self, their):
3119         if their.__class__ == bytes:
3120             return self._value[1] == their
3121         if not issubclass(their.__class__, BitString):
3122             return False
3123         return (
3124             self._value == their._value and
3125             self.tag == their.tag and
3126             self._expl == their._expl
3127         )
3128
3129     @property
3130     def named(self):
3131         """Named representation (if exists) of the bits
3132
3133         :returns: [str(name), ...]
3134         """
3135         return [name for name, bit in iteritems(self.specs) if self[bit]]
3136
3137     def __call__(
3138             self,
3139             value=None,
3140             impl=None,
3141             expl=None,
3142             default=None,
3143             optional=None,
3144     ):
3145         return self.__class__(
3146             value=value,
3147             impl=self.tag if impl is None else impl,
3148             expl=self._expl if expl is None else expl,
3149             default=self.default if default is None else default,
3150             optional=self.optional if optional is None else optional,
3151             _specs=self.specs,
3152         )
3153
3154     def __getitem__(self, key):
3155         if key.__class__ == int:
3156             bit_len, octets = self._value
3157             if key >= bit_len:
3158                 return False
3159             return (
3160                 byte2int(memoryview(octets)[key // 8:]) >>
3161                 (7 - (key % 8))
3162             ) & 1 == 1
3163         if isinstance(key, string_types):
3164             value = self.specs.get(key)
3165             if value is None:
3166                 raise ObjUnknown("BitString value: %s" % key)
3167             return self[value]
3168         raise InvalidValueType((int, str))
3169
3170     def _encode(self):
3171         self._assert_ready()
3172         bit_len, octets = self._value
3173         return b"".join((
3174             self.tag,
3175             len_encode(len(octets) + 1),
3176             int2byte((8 - bit_len % 8) % 8),
3177             octets,
3178         ))
3179
3180     def _encode_cer(self, writer):
3181         bit_len, octets = self._value
3182         if len(octets) + 1 <= 1000:
3183             write_full(writer, self._encode())
3184             return
3185         write_full(writer, self.tag_constructed)
3186         write_full(writer, LENINDEF)
3187         for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3188             write_full(writer, b"".join((
3189                 BitString.tag_default,
3190                 LEN1K,
3191                 int2byte(0),
3192                 octets[offset:offset + 999],
3193             )))
3194         tail = octets[offset+999:]
3195         if len(tail) > 0:
3196             tail = int2byte((8 - bit_len % 8) % 8) + tail
3197             write_full(writer, b"".join((
3198                 BitString.tag_default,
3199                 len_encode(len(tail)),
3200                 tail,
3201             )))
3202         write_full(writer, EOC)
3203
3204     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3205         try:
3206             t, tlen, lv = tag_strip(tlv)
3207         except DecodeError as err:
3208             raise err.__class__(
3209                 msg=err.msg,
3210                 klass=self.__class__,
3211                 decode_path=decode_path,
3212                 offset=offset,
3213             )
3214         if t == self.tag:
3215             if tag_only:  # pragma: no cover
3216                 yield None
3217                 return
3218             try:
3219                 l, llen, v = len_decode(lv)
3220             except DecodeError as err:
3221                 raise err.__class__(
3222                     msg=err.msg,
3223                     klass=self.__class__,
3224                     decode_path=decode_path,
3225                     offset=offset,
3226                 )
3227             if l > len(v):
3228                 raise NotEnoughData(
3229                     "encoded length is longer than data",
3230                     klass=self.__class__,
3231                     decode_path=decode_path,
3232                     offset=offset,
3233                 )
3234             if l == 0:
3235                 raise NotEnoughData(
3236                     "zero length",
3237                     klass=self.__class__,
3238                     decode_path=decode_path,
3239                     offset=offset,
3240                 )
3241             pad_size = byte2int(v)
3242             if l == 1 and pad_size != 0:
3243                 raise DecodeError(
3244                     "invalid empty value",
3245                     klass=self.__class__,
3246                     decode_path=decode_path,
3247                     offset=offset,
3248                 )
3249             if pad_size > 7:
3250                 raise DecodeError(
3251                     "too big pad",
3252                     klass=self.__class__,
3253                     decode_path=decode_path,
3254                     offset=offset,
3255                 )
3256             if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3257                 raise DecodeError(
3258                     "invalid pad",
3259                     klass=self.__class__,
3260                     decode_path=decode_path,
3261                     offset=offset,
3262                 )
3263             v, tail = v[:l], v[l:]
3264             bit_len = (len(v) - 1) * 8 - pad_size
3265             obj = self.__class__(
3266                 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3267                 impl=self.tag,
3268                 expl=self._expl,
3269                 default=self.default,
3270                 optional=self.optional,
3271                 _specs=self.specs,
3272                 _decoded=(offset, llen, l),
3273             )
3274             if evgen_mode:
3275                 obj._value = (bit_len, None)
3276             yield decode_path, obj, tail
3277             return
3278         if t != self.tag_constructed:
3279             raise TagMismatch(
3280                 klass=self.__class__,
3281                 decode_path=decode_path,
3282                 offset=offset,
3283             )
3284         if not ctx.get("bered", False):
3285             raise DecodeError(
3286                 "unallowed BER constructed encoding",
3287                 klass=self.__class__,
3288                 decode_path=decode_path,
3289                 offset=offset,
3290             )
3291         if tag_only:  # pragma: no cover
3292             yield None
3293             return
3294         lenindef = False
3295         try:
3296             l, llen, v = len_decode(lv)
3297         except LenIndefForm:
3298             llen, l, v = 1, 0, lv[1:]
3299             lenindef = True
3300         except DecodeError as err:
3301             raise err.__class__(
3302                 msg=err.msg,
3303                 klass=self.__class__,
3304                 decode_path=decode_path,
3305                 offset=offset,
3306             )
3307         if l > len(v):
3308             raise NotEnoughData(
3309                 "encoded length is longer than data",
3310                 klass=self.__class__,
3311                 decode_path=decode_path,
3312                 offset=offset,
3313             )
3314         if not lenindef and l == 0:
3315             raise NotEnoughData(
3316                 "zero length",
3317                 klass=self.__class__,
3318                 decode_path=decode_path,
3319                 offset=offset,
3320             )
3321         chunks = []
3322         sub_offset = offset + tlen + llen
3323         vlen = 0
3324         while True:
3325             if lenindef:
3326                 if v[:EOC_LEN].tobytes() == EOC:
3327                     break
3328             else:
3329                 if vlen == l:
3330                     break
3331                 if vlen > l:
3332                     raise DecodeError(
3333                         "chunk out of bounds",
3334                         klass=self.__class__,
3335                         decode_path=decode_path + (str(len(chunks) - 1),),
3336                         offset=chunks[-1].offset,
3337                     )
3338             sub_decode_path = decode_path + (str(len(chunks)),)
3339             try:
3340                 if evgen_mode:
3341                     for _decode_path, chunk, v_tail in BitString().decode_evgen(
3342                             v,
3343                             offset=sub_offset,
3344                             decode_path=sub_decode_path,
3345                             leavemm=True,
3346                             ctx=ctx,
3347                             _ctx_immutable=False,
3348                     ):
3349                         yield _decode_path, chunk, v_tail
3350                 else:
3351                     _, chunk, v_tail = next(BitString().decode_evgen(
3352                         v,
3353                         offset=sub_offset,
3354                         decode_path=sub_decode_path,
3355                         leavemm=True,
3356                         ctx=ctx,
3357                         _ctx_immutable=False,
3358                         _evgen_mode=False,
3359                     ))
3360             except TagMismatch:
3361                 raise DecodeError(
3362                     "expected BitString encoded chunk",
3363                     klass=self.__class__,
3364                     decode_path=sub_decode_path,
3365                     offset=sub_offset,
3366                 )
3367             chunks.append(chunk)
3368             sub_offset += chunk.tlvlen
3369             vlen += chunk.tlvlen
3370             v = v_tail
3371         if len(chunks) == 0:
3372             raise DecodeError(
3373                 "no chunks",
3374                 klass=self.__class__,
3375                 decode_path=decode_path,
3376                 offset=offset,
3377             )
3378         values = []
3379         bit_len = 0
3380         for chunk_i, chunk in enumerate(chunks[:-1]):
3381             if chunk.bit_len % 8 != 0:
3382                 raise DecodeError(
3383                     "BitString chunk is not multiple of 8 bits",
3384                     klass=self.__class__,
3385                     decode_path=decode_path + (str(chunk_i),),
3386                     offset=chunk.offset,
3387                 )
3388             if not evgen_mode:
3389                 values.append(bytes(chunk))
3390             bit_len += chunk.bit_len
3391         chunk_last = chunks[-1]
3392         if not evgen_mode:
3393             values.append(bytes(chunk_last))
3394         bit_len += chunk_last.bit_len
3395         obj = self.__class__(
3396             value=None if evgen_mode else (bit_len, b"".join(values)),
3397             impl=self.tag,
3398             expl=self._expl,
3399             default=self.default,
3400             optional=self.optional,
3401             _specs=self.specs,
3402             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3403         )
3404         if evgen_mode:
3405             obj._value = (bit_len, None)
3406         obj.lenindef = lenindef
3407         obj.ber_encoded = True
3408         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3409
3410     def __repr__(self):
3411         return pp_console_row(next(self.pps()))
3412
3413     def pps(self, decode_path=()):
3414         value = None
3415         blob = None
3416         if self.ready:
3417             bit_len, blob = self._value
3418             value = "%d bits" % bit_len
3419             if len(self.specs) > 0 and blob is not None:
3420                 blob = tuple(self.named)
3421         yield _pp(
3422             obj=self,
3423             asn1_type_name=self.asn1_type_name,
3424             obj_name=self.__class__.__name__,
3425             decode_path=decode_path,
3426             value=value,
3427             blob=blob,
3428             optional=self.optional,
3429             default=self == self.default,
3430             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3431             expl=None if self._expl is None else tag_decode(self._expl),
3432             offset=self.offset,
3433             tlen=self.tlen,
3434             llen=self.llen,
3435             vlen=self.vlen,
3436             expl_offset=self.expl_offset if self.expled else None,
3437             expl_tlen=self.expl_tlen if self.expled else None,
3438             expl_llen=self.expl_llen if self.expled else None,
3439             expl_vlen=self.expl_vlen if self.expled else None,
3440             expl_lenindef=self.expl_lenindef,
3441             lenindef=self.lenindef,
3442             ber_encoded=self.ber_encoded,
3443             bered=self.bered,
3444         )
3445         defined_by, defined = self.defined or (None, None)
3446         if defined_by is not None:
3447             yield defined.pps(
3448                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3449             )
3450         for pp in self.pps_lenindef(decode_path):
3451             yield pp
3452
3453
3454 OctetStringState = namedtuple(
3455     "OctetStringState",
3456     BasicState._fields + (
3457         "value",
3458         "bound_min",
3459         "bound_max",
3460         "tag_constructed",
3461         "defined",
3462     ),
3463     **NAMEDTUPLE_KWARGS
3464 )
3465
3466
3467 class OctetString(Obj):
3468     """``OCTET STRING`` binary string type
3469
3470     >>> s = OctetString(b"hello world")
3471     OCTET STRING 11 bytes 68656c6c6f20776f726c64
3472     >>> s == OctetString(b"hello world")
3473     True
3474     >>> bytes(s)
3475     b'hello world'
3476
3477     >>> OctetString(b"hello", bounds=(4, 4))
3478     Traceback (most recent call last):
3479     pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3480     >>> OctetString(b"hell", bounds=(4, 4))
3481     OCTET STRING 4 bytes 68656c6c
3482
3483     Memoryviews can be used as a values. If memoryview is made on
3484     mmap-ed file, then it does not take storage inside OctetString
3485     itself. In CER encoding mode it will be streamed to the specified
3486     writer, copying 1 KB chunks.
3487     """
3488     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3489     tag_default = tag_encode(4)
3490     asn1_type_name = "OCTET STRING"
3491     evgen_mode_skip_value = True
3492
3493     def __init__(
3494             self,
3495             value=None,
3496             bounds=None,
3497             impl=None,
3498             expl=None,
3499             default=None,
3500             optional=False,
3501             _decoded=(0, 0, 0),
3502             ctx=None,
3503     ):
3504         """
3505         :param value: set the value. Either binary type, or
3506                       :py:class:`pyderasn.OctetString` object
3507         :param bounds: set ``(MIN, MAX)`` value size constraint.
3508                        (-inf, +inf) by default
3509         :param bytes impl: override default tag with ``IMPLICIT`` one
3510         :param bytes expl: override default tag with ``EXPLICIT`` one
3511         :param default: set default value. Type same as in ``value``
3512         :param bool optional: is object ``OPTIONAL`` in sequence
3513         """
3514         super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3515         self._value = value
3516         self._bound_min, self._bound_max = getattr(
3517             self,
3518             "bounds",
3519             (0, float("+inf")),
3520         ) if bounds is None else bounds
3521         if value is not None:
3522             self._value = self._value_sanitize(value)
3523         if default is not None:
3524             default = self._value_sanitize(default)
3525             self.default = self.__class__(
3526                 value=default,
3527                 impl=self.tag,
3528                 expl=self._expl,
3529             )
3530             if self._value is None:
3531                 self._value = default
3532         self.defined = None
3533         tag_klass, _, tag_num = tag_decode(self.tag)
3534         self.tag_constructed = tag_encode(
3535             klass=tag_klass,
3536             form=TagFormConstructed,
3537             num=tag_num,
3538         )
3539
3540     def _value_sanitize(self, value):
3541         if value.__class__ == binary_type or value.__class__ == memoryview:
3542             pass
3543         elif issubclass(value.__class__, OctetString):
3544             value = value._value
3545         else:
3546             raise InvalidValueType((self.__class__, bytes, memoryview))
3547         if not self._bound_min <= len(value) <= self._bound_max:
3548             raise BoundsError(self._bound_min, len(value), self._bound_max)
3549         return value
3550
3551     @property
3552     def ready(self):
3553         return self._value is not None
3554
3555     def __getstate__(self):
3556         return OctetStringState(
3557             __version__,
3558             self.tag,
3559             self._tag_order,
3560             self._expl,
3561             self.default,
3562             self.optional,
3563             self.offset,
3564             self.llen,
3565             self.vlen,
3566             self.expl_lenindef,
3567             self.lenindef,
3568             self.ber_encoded,
3569             self._value,
3570             self._bound_min,
3571             self._bound_max,
3572             self.tag_constructed,
3573             self.defined,
3574         )
3575
3576     def __setstate__(self, state):
3577         super(OctetString, self).__setstate__(state)
3578         self._value = state.value
3579         self._bound_min = state.bound_min
3580         self._bound_max = state.bound_max
3581         self.tag_constructed = state.tag_constructed
3582         self.defined = state.defined
3583
3584     def __bytes__(self):
3585         self._assert_ready()
3586         return bytes(self._value)
3587
3588     def __eq__(self, their):
3589         if their.__class__ == binary_type:
3590             return self._value == their
3591         if not issubclass(their.__class__, OctetString):
3592             return False
3593         return (
3594             self._value == their._value and
3595             self.tag == their.tag and
3596             self._expl == their._expl
3597         )
3598
3599     def __lt__(self, their):
3600         return self._value < their._value
3601
3602     def __call__(
3603             self,
3604             value=None,
3605             bounds=None,
3606             impl=None,
3607             expl=None,
3608             default=None,
3609             optional=None,
3610     ):
3611         return self.__class__(
3612             value=value,
3613             bounds=(
3614                 (self._bound_min, self._bound_max)
3615                 if bounds is None else bounds
3616             ),
3617             impl=self.tag if impl is None else impl,
3618             expl=self._expl if expl is None else expl,
3619             default=self.default if default is None else default,
3620             optional=self.optional if optional is None else optional,
3621         )
3622
3623     def _encode(self):
3624         self._assert_ready()
3625         return b"".join((
3626             self.tag,
3627             len_encode(len(self._value)),
3628             self._value,
3629         ))
3630
3631     def _encode_cer(self, writer):
3632         octets = self._value
3633         if len(octets) <= 1000:
3634             write_full(writer, self._encode())
3635             return
3636         write_full(writer, self.tag_constructed)
3637         write_full(writer, LENINDEF)
3638         for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3639             write_full(writer, b"".join((
3640                 OctetString.tag_default,
3641                 LEN1K,
3642                 octets[offset:offset + 1000],
3643             )))
3644         tail = octets[offset+1000:]
3645         if len(tail) > 0:
3646             write_full(writer, b"".join((
3647                 OctetString.tag_default,
3648                 len_encode(len(tail)),
3649                 tail,
3650             )))
3651         write_full(writer, EOC)
3652
3653     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3654         try:
3655             t, tlen, lv = tag_strip(tlv)
3656         except DecodeError as err:
3657             raise err.__class__(
3658                 msg=err.msg,
3659                 klass=self.__class__,
3660                 decode_path=decode_path,
3661                 offset=offset,
3662             )
3663         if t == self.tag:
3664             if tag_only:
3665                 yield None
3666                 return
3667             try:
3668                 l, llen, v = len_decode(lv)
3669             except DecodeError as err:
3670                 raise err.__class__(
3671                     msg=err.msg,
3672                     klass=self.__class__,
3673                     decode_path=decode_path,
3674                     offset=offset,
3675                 )
3676             if l > len(v):
3677                 raise NotEnoughData(
3678                     "encoded length is longer than data",
3679                     klass=self.__class__,
3680                     decode_path=decode_path,
3681                     offset=offset,
3682                 )
3683             v, tail = v[:l], v[l:]
3684             if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3685                 raise DecodeError(
3686                     msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3687                     klass=self.__class__,
3688                     decode_path=decode_path,
3689                     offset=offset,
3690                 )
3691             try:
3692                 obj = self.__class__(
3693                     value=(
3694                         None if (evgen_mode and self.evgen_mode_skip_value)
3695                         else v.tobytes()
3696                     ),
3697                     bounds=(self._bound_min, self._bound_max),
3698                     impl=self.tag,
3699                     expl=self._expl,
3700                     default=self.default,
3701                     optional=self.optional,
3702                     _decoded=(offset, llen, l),
3703                     ctx=ctx,
3704                 )
3705             except DecodeError as err:
3706                 raise DecodeError(
3707                     msg=err.msg,
3708                     klass=self.__class__,
3709                     decode_path=decode_path,
3710                     offset=offset,
3711                 )
3712             except BoundsError as err:
3713                 raise DecodeError(
3714                     msg=str(err),
3715                     klass=self.__class__,
3716                     decode_path=decode_path,
3717                     offset=offset,
3718                 )
3719             yield decode_path, obj, tail
3720             return
3721         if t != self.tag_constructed:
3722             raise TagMismatch(
3723                 klass=self.__class__,
3724                 decode_path=decode_path,
3725                 offset=offset,
3726             )
3727         if not ctx.get("bered", False):
3728             raise DecodeError(
3729                 "unallowed BER constructed encoding",
3730                 klass=self.__class__,
3731                 decode_path=decode_path,
3732                 offset=offset,
3733             )
3734         if tag_only:
3735             yield None
3736             return
3737         lenindef = False
3738         try:
3739             l, llen, v = len_decode(lv)
3740         except LenIndefForm:
3741             llen, l, v = 1, 0, lv[1:]
3742             lenindef = True
3743         except DecodeError as err:
3744             raise err.__class__(
3745                 msg=err.msg,
3746                 klass=self.__class__,
3747                 decode_path=decode_path,
3748                 offset=offset,
3749             )
3750         if l > len(v):
3751             raise NotEnoughData(
3752                 "encoded length is longer than data",
3753                 klass=self.__class__,
3754                 decode_path=decode_path,
3755                 offset=offset,
3756             )
3757         chunks = []
3758         chunks_count = 0
3759         sub_offset = offset + tlen + llen
3760         vlen = 0
3761         payload_len = 0
3762         while True:
3763             if lenindef:
3764                 if v[:EOC_LEN].tobytes() == EOC:
3765                     break
3766             else:
3767                 if vlen == l:
3768                     break
3769                 if vlen > l:
3770                     raise DecodeError(
3771                         "chunk out of bounds",
3772                         klass=self.__class__,
3773                         decode_path=decode_path + (str(len(chunks) - 1),),
3774                         offset=chunks[-1].offset,
3775                     )
3776             try:
3777                 if evgen_mode:
3778                     sub_decode_path = decode_path + (str(chunks_count),)
3779                     for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3780                             v,
3781                             offset=sub_offset,
3782                             decode_path=sub_decode_path,
3783                             leavemm=True,
3784                             ctx=ctx,
3785                             _ctx_immutable=False,
3786                     ):
3787                         yield _decode_path, chunk, v_tail
3788                         if not chunk.ber_encoded:
3789                             payload_len += chunk.vlen
3790                     chunks_count += 1
3791                 else:
3792                     sub_decode_path = decode_path + (str(len(chunks)),)
3793                     _, chunk, v_tail = next(OctetString().decode_evgen(
3794                         v,
3795                         offset=sub_offset,
3796                         decode_path=sub_decode_path,
3797                         leavemm=True,
3798                         ctx=ctx,
3799                         _ctx_immutable=False,
3800                         _evgen_mode=False,
3801                     ))
3802                     chunks.append(chunk)
3803             except TagMismatch:
3804                 raise DecodeError(
3805                     "expected OctetString encoded chunk",
3806                     klass=self.__class__,
3807                     decode_path=sub_decode_path,
3808                     offset=sub_offset,
3809                 )
3810             sub_offset += chunk.tlvlen
3811             vlen += chunk.tlvlen
3812             v = v_tail
3813         if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3814             raise DecodeError(
3815                 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3816                 klass=self.__class__,
3817                 decode_path=decode_path,
3818                 offset=offset,
3819             )
3820         try:
3821             obj = self.__class__(
3822                 value=(
3823                     None if evgen_mode else
3824                     b"".join(bytes(chunk) for chunk in chunks)
3825                 ),
3826                 bounds=(self._bound_min, self._bound_max),
3827                 impl=self.tag,
3828                 expl=self._expl,
3829                 default=self.default,
3830                 optional=self.optional,
3831                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3832                 ctx=ctx,
3833             )
3834         except DecodeError as err:
3835             raise DecodeError(
3836                 msg=err.msg,
3837                 klass=self.__class__,
3838                 decode_path=decode_path,
3839                 offset=offset,
3840             )
3841         except BoundsError as err:
3842             raise DecodeError(
3843                 msg=str(err),
3844                 klass=self.__class__,
3845                 decode_path=decode_path,
3846                 offset=offset,
3847             )
3848         obj.lenindef = lenindef
3849         obj.ber_encoded = True
3850         yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3851
3852     def __repr__(self):
3853         return pp_console_row(next(self.pps()))
3854
3855     def pps(self, decode_path=()):
3856         yield _pp(
3857             obj=self,
3858             asn1_type_name=self.asn1_type_name,
3859             obj_name=self.__class__.__name__,
3860             decode_path=decode_path,
3861             value=("%d bytes" % len(self._value)) if self.ready else None,
3862             blob=self._value if self.ready else None,
3863             optional=self.optional,
3864             default=self == self.default,
3865             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3866             expl=None if self._expl is None else tag_decode(self._expl),
3867             offset=self.offset,
3868             tlen=self.tlen,
3869             llen=self.llen,
3870             vlen=self.vlen,
3871             expl_offset=self.expl_offset if self.expled else None,
3872             expl_tlen=self.expl_tlen if self.expled else None,
3873             expl_llen=self.expl_llen if self.expled else None,
3874             expl_vlen=self.expl_vlen if self.expled else None,
3875             expl_lenindef=self.expl_lenindef,
3876             lenindef=self.lenindef,
3877             ber_encoded=self.ber_encoded,
3878             bered=self.bered,
3879         )
3880         defined_by, defined = self.defined or (None, None)
3881         if defined_by is not None:
3882             yield defined.pps(
3883                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3884             )
3885         for pp in self.pps_lenindef(decode_path):
3886             yield pp
3887
3888
3889 def agg_octet_string(evgens, decode_path, raw, writer):
3890     """Aggregate constructed string (OctetString and its derivatives)
3891
3892     :param evgens: iterator of generated events
3893     :param decode_path: points to the string we want to decode
3894     :param raw: slicebable (memoryview, bytearray, etc) with
3895                 the data evgens are generated on
3896     :param writer: buffer.write where string is going to be saved
3897     :param writer: where string is going to be saved. Must comply
3898                    with ``io.RawIOBase.write`` behaviour
3899     """
3900     decode_path_len = len(decode_path)
3901     for dp, obj, _ in evgens:
3902         if dp[:decode_path_len] != decode_path:
3903             continue
3904         if not obj.ber_encoded:
3905             write_full(writer, raw[
3906                 obj.offset + obj.tlen + obj.llen:
3907                 obj.offset + obj.tlen + obj.llen + obj.vlen -
3908                 (EOC_LEN if obj.expl_lenindef else 0)
3909             ])
3910         if len(dp) == decode_path_len:
3911             break
3912
3913
3914 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
3915
3916
3917 class Null(Obj):
3918     """``NULL`` null object
3919
3920     >>> n = Null()
3921     NULL
3922     >>> n.ready
3923     True
3924     """
3925     __slots__ = ()
3926     tag_default = tag_encode(5)
3927     asn1_type_name = "NULL"
3928
3929     def __init__(
3930             self,
3931             value=None,  # unused, but Sequence passes it
3932             impl=None,
3933             expl=None,
3934             optional=False,
3935             _decoded=(0, 0, 0),
3936     ):
3937         """
3938         :param bytes impl: override default tag with ``IMPLICIT`` one
3939         :param bytes expl: override default tag with ``EXPLICIT`` one
3940         :param bool optional: is object ``OPTIONAL`` in sequence
3941         """
3942         super(Null, self).__init__(impl, expl, None, optional, _decoded)
3943         self.default = None
3944
3945     @property
3946     def ready(self):
3947         return True
3948
3949     def __getstate__(self):
3950         return NullState(
3951             __version__,
3952             self.tag,
3953             self._tag_order,
3954             self._expl,
3955             self.default,
3956             self.optional,
3957             self.offset,
3958             self.llen,
3959             self.vlen,
3960             self.expl_lenindef,
3961             self.lenindef,
3962             self.ber_encoded,
3963         )
3964
3965     def __eq__(self, their):
3966         if not issubclass(their.__class__, Null):
3967             return False
3968         return (
3969             self.tag == their.tag and
3970             self._expl == their._expl
3971         )
3972
3973     def __call__(
3974             self,
3975             value=None,
3976             impl=None,
3977             expl=None,
3978             optional=None,
3979     ):
3980         return self.__class__(
3981             impl=self.tag if impl is None else impl,
3982             expl=self._expl if expl is None else expl,
3983             optional=self.optional if optional is None else optional,
3984         )
3985
3986     def _encode(self):
3987         return self.tag + len_encode(0)
3988
3989     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3990         try:
3991             t, _, lv = tag_strip(tlv)
3992         except DecodeError as err:
3993             raise err.__class__(
3994                 msg=err.msg,
3995                 klass=self.__class__,
3996                 decode_path=decode_path,
3997                 offset=offset,
3998             )
3999         if t != self.tag:
4000             raise TagMismatch(
4001                 klass=self.__class__,
4002                 decode_path=decode_path,
4003                 offset=offset,
4004             )
4005         if tag_only:  # pragma: no cover
4006             yield None
4007             return
4008         try:
4009             l, _, v = len_decode(lv)
4010         except DecodeError as err:
4011             raise err.__class__(
4012                 msg=err.msg,
4013                 klass=self.__class__,
4014                 decode_path=decode_path,
4015                 offset=offset,
4016             )
4017         if l != 0:
4018             raise InvalidLength(
4019                 "Null must have zero length",
4020                 klass=self.__class__,
4021                 decode_path=decode_path,
4022                 offset=offset,
4023             )
4024         obj = self.__class__(
4025             impl=self.tag,
4026             expl=self._expl,
4027             optional=self.optional,
4028             _decoded=(offset, 1, 0),
4029         )
4030         yield decode_path, obj, v
4031
4032     def __repr__(self):
4033         return pp_console_row(next(self.pps()))
4034
4035     def pps(self, decode_path=()):
4036         yield _pp(
4037             obj=self,
4038             asn1_type_name=self.asn1_type_name,
4039             obj_name=self.__class__.__name__,
4040             decode_path=decode_path,
4041             optional=self.optional,
4042             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4043             expl=None if self._expl is None else tag_decode(self._expl),
4044             offset=self.offset,
4045             tlen=self.tlen,
4046             llen=self.llen,
4047             vlen=self.vlen,
4048             expl_offset=self.expl_offset if self.expled else None,
4049             expl_tlen=self.expl_tlen if self.expled else None,
4050             expl_llen=self.expl_llen if self.expled else None,
4051             expl_vlen=self.expl_vlen if self.expled else None,
4052             expl_lenindef=self.expl_lenindef,
4053             bered=self.bered,
4054         )
4055         for pp in self.pps_lenindef(decode_path):
4056             yield pp
4057
4058
4059 ObjectIdentifierState = namedtuple(
4060     "ObjectIdentifierState",
4061     BasicState._fields + ("value", "defines"),
4062     **NAMEDTUPLE_KWARGS
4063 )
4064
4065
4066 class ObjectIdentifier(Obj):
4067     """``OBJECT IDENTIFIER`` OID type
4068
4069     >>> oid = ObjectIdentifier((1, 2, 3))
4070     OBJECT IDENTIFIER 1.2.3
4071     >>> oid == ObjectIdentifier("1.2.3")
4072     True
4073     >>> tuple(oid)
4074     (1, 2, 3)
4075     >>> str(oid)
4076     '1.2.3'
4077     >>> oid + (4, 5) + ObjectIdentifier("1.7")
4078     OBJECT IDENTIFIER 1.2.3.4.5.1.7
4079
4080     >>> str(ObjectIdentifier((3, 1)))
4081     Traceback (most recent call last):
4082     pyderasn.InvalidOID: unacceptable first arc value
4083     """
4084     __slots__ = ("defines",)
4085     tag_default = tag_encode(6)
4086     asn1_type_name = "OBJECT IDENTIFIER"
4087
4088     def __init__(
4089             self,
4090             value=None,
4091             defines=(),
4092             impl=None,
4093             expl=None,
4094             default=None,
4095             optional=False,
4096             _decoded=(0, 0, 0),
4097     ):
4098         """
4099         :param value: set the value. Either tuples of integers,
4100                       string of "."-concatenated integers, or
4101                       :py:class:`pyderasn.ObjectIdentifier` object
4102         :param defines: sequence of tuples. Each tuple has two elements.
4103                         First one is relative to current one decode
4104                         path, aiming to the field defined by that OID.
4105                         Read about relative path in
4106                         :py:func:`pyderasn.abs_decode_path`. Second
4107                         tuple element is ``{OID: pyderasn.Obj()}``
4108                         dictionary, mapping between current OID value
4109                         and structure applied to defined field.
4110                         :ref:`Read about DEFINED BY <definedby>`
4111         :param bytes impl: override default tag with ``IMPLICIT`` one
4112         :param bytes expl: override default tag with ``EXPLICIT`` one
4113         :param default: set default value. Type same as in ``value``
4114         :param bool optional: is object ``OPTIONAL`` in sequence
4115         """
4116         super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4117         self._value = value
4118         if value is not None:
4119             self._value = self._value_sanitize(value)
4120         if default is not None:
4121             default = self._value_sanitize(default)
4122             self.default = self.__class__(
4123                 value=default,
4124                 impl=self.tag,
4125                 expl=self._expl,
4126             )
4127             if self._value is None:
4128                 self._value = default
4129         self.defines = defines
4130
4131     def __add__(self, their):
4132         if their.__class__ == tuple:
4133             return self.__class__(self._value + their)
4134         if isinstance(their, self.__class__):
4135             return self.__class__(self._value + their._value)
4136         raise InvalidValueType((self.__class__, tuple))
4137
4138     def _value_sanitize(self, value):
4139         if issubclass(value.__class__, ObjectIdentifier):
4140             return value._value
4141         if isinstance(value, string_types):
4142             try:
4143                 value = tuple(pureint(arc) for arc in value.split("."))
4144             except ValueError:
4145                 raise InvalidOID("unacceptable arcs values")
4146         if value.__class__ == tuple:
4147             if len(value) < 2:
4148                 raise InvalidOID("less than 2 arcs")
4149             first_arc = value[0]
4150             if first_arc in (0, 1):
4151                 if not (0 <= value[1] <= 39):
4152                     raise InvalidOID("second arc is too wide")
4153             elif first_arc == 2:
4154                 pass
4155             else:
4156                 raise InvalidOID("unacceptable first arc value")
4157             if not all(arc >= 0 for arc in value):
4158                 raise InvalidOID("negative arc value")
4159             return value
4160         raise InvalidValueType((self.__class__, str, tuple))
4161
4162     @property
4163     def ready(self):
4164         return self._value is not None
4165
4166     def __getstate__(self):
4167         return ObjectIdentifierState(
4168             __version__,
4169             self.tag,
4170             self._tag_order,
4171             self._expl,
4172             self.default,
4173             self.optional,
4174             self.offset,
4175             self.llen,
4176             self.vlen,
4177             self.expl_lenindef,
4178             self.lenindef,
4179             self.ber_encoded,
4180             self._value,
4181             self.defines,
4182         )
4183
4184     def __setstate__(self, state):
4185         super(ObjectIdentifier, self).__setstate__(state)
4186         self._value = state.value
4187         self.defines = state.defines
4188
4189     def __iter__(self):
4190         self._assert_ready()
4191         return iter(self._value)
4192
4193     def __str__(self):
4194         return ".".join(str(arc) for arc in self._value or ())
4195
4196     def __hash__(self):
4197         self._assert_ready()
4198         return hash(
4199             self.tag +
4200             bytes(self._expl or b"") +
4201             str(self._value).encode("ascii"),
4202         )
4203
4204     def __eq__(self, their):
4205         if their.__class__ == tuple:
4206             return self._value == their
4207         if not issubclass(their.__class__, ObjectIdentifier):
4208             return False
4209         return (
4210             self.tag == their.tag and
4211             self._expl == their._expl and
4212             self._value == their._value
4213         )
4214
4215     def __lt__(self, their):
4216         return self._value < their._value
4217
4218     def __call__(
4219             self,
4220             value=None,
4221             defines=None,
4222             impl=None,
4223             expl=None,
4224             default=None,
4225             optional=None,
4226     ):
4227         return self.__class__(
4228             value=value,
4229             defines=self.defines if defines is None else defines,
4230             impl=self.tag if impl is None else impl,
4231             expl=self._expl if expl is None else expl,
4232             default=self.default if default is None else default,
4233             optional=self.optional if optional is None else optional,
4234         )
4235
4236     def _encode(self):
4237         self._assert_ready()
4238         value = self._value
4239         first_value = value[1]
4240         first_arc = value[0]
4241         if first_arc == 0:
4242             pass
4243         elif first_arc == 1:
4244             first_value += 40
4245         elif first_arc == 2:
4246             first_value += 80
4247         else:  # pragma: no cover
4248             raise RuntimeError("invalid arc is stored")
4249         octets = [zero_ended_encode(first_value)]
4250         for arc in value[2:]:
4251             octets.append(zero_ended_encode(arc))
4252         v = b"".join(octets)
4253         return b"".join((self.tag, len_encode(len(v)), v))
4254
4255     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4256         try:
4257             t, _, lv = tag_strip(tlv)
4258         except DecodeError as err:
4259             raise err.__class__(
4260                 msg=err.msg,
4261                 klass=self.__class__,
4262                 decode_path=decode_path,
4263                 offset=offset,
4264             )
4265         if t != self.tag:
4266             raise TagMismatch(
4267                 klass=self.__class__,
4268                 decode_path=decode_path,
4269                 offset=offset,
4270             )
4271         if tag_only:  # pragma: no cover
4272             yield None
4273             return
4274         try:
4275             l, llen, v = len_decode(lv)
4276         except DecodeError as err:
4277             raise err.__class__(
4278                 msg=err.msg,
4279                 klass=self.__class__,
4280                 decode_path=decode_path,
4281                 offset=offset,
4282             )
4283         if l > len(v):
4284             raise NotEnoughData(
4285                 "encoded length is longer than data",
4286                 klass=self.__class__,
4287                 decode_path=decode_path,
4288                 offset=offset,
4289             )
4290         if l == 0:
4291             raise NotEnoughData(
4292                 "zero length",
4293                 klass=self.__class__,
4294                 decode_path=decode_path,
4295                 offset=offset,
4296             )
4297         v, tail = v[:l], v[l:]
4298         arcs = []
4299         ber_encoded = False
4300         while len(v) > 0:
4301             i = 0
4302             arc = 0
4303             while True:
4304                 octet = indexbytes(v, i)
4305                 if i == 0 and octet == 0x80:
4306                     if ctx.get("bered", False):
4307                         ber_encoded = True
4308                     else:
4309                         raise DecodeError("non normalized arc encoding")
4310                 arc = (arc << 7) | (octet & 0x7F)
4311                 if octet & 0x80 == 0:
4312                     arcs.append(arc)
4313                     v = v[i + 1:]
4314                     break
4315                 i += 1
4316                 if i == len(v):
4317                     raise DecodeError(
4318                         "unfinished OID",
4319                         klass=self.__class__,
4320                         decode_path=decode_path,
4321                         offset=offset,
4322                     )
4323         first_arc = 0
4324         second_arc = arcs[0]
4325         if 0 <= second_arc <= 39:
4326             first_arc = 0
4327         elif 40 <= second_arc <= 79:
4328             first_arc = 1
4329             second_arc -= 40
4330         else:
4331             first_arc = 2
4332             second_arc -= 80
4333         obj = self.__class__(
4334             value=tuple([first_arc, second_arc] + arcs[1:]),
4335             impl=self.tag,
4336             expl=self._expl,
4337             default=self.default,
4338             optional=self.optional,
4339             _decoded=(offset, llen, l),
4340         )
4341         if ber_encoded:
4342             obj.ber_encoded = True
4343         yield decode_path, obj, tail
4344
4345     def __repr__(self):
4346         return pp_console_row(next(self.pps()))
4347
4348     def pps(self, decode_path=()):
4349         yield _pp(
4350             obj=self,
4351             asn1_type_name=self.asn1_type_name,
4352             obj_name=self.__class__.__name__,
4353             decode_path=decode_path,
4354             value=str(self) if self.ready else None,
4355             optional=self.optional,
4356             default=self == self.default,
4357             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4358             expl=None if self._expl is None else tag_decode(self._expl),
4359             offset=self.offset,
4360             tlen=self.tlen,
4361             llen=self.llen,
4362             vlen=self.vlen,
4363             expl_offset=self.expl_offset if self.expled else None,
4364             expl_tlen=self.expl_tlen if self.expled else None,
4365             expl_llen=self.expl_llen if self.expled else None,
4366             expl_vlen=self.expl_vlen if self.expled else None,
4367             expl_lenindef=self.expl_lenindef,
4368             ber_encoded=self.ber_encoded,
4369             bered=self.bered,
4370         )
4371         for pp in self.pps_lenindef(decode_path):
4372             yield pp
4373
4374
4375 class Enumerated(Integer):
4376     """``ENUMERATED`` integer type
4377
4378     This type is identical to :py:class:`pyderasn.Integer`, but requires
4379     schema to be specified and does not accept values missing from it.
4380     """
4381     __slots__ = ()
4382     tag_default = tag_encode(10)
4383     asn1_type_name = "ENUMERATED"
4384
4385     def __init__(
4386             self,
4387             value=None,
4388             impl=None,
4389             expl=None,
4390             default=None,
4391             optional=False,
4392             _specs=None,
4393             _decoded=(0, 0, 0),
4394             bounds=None,  # dummy argument, workability for Integer.decode
4395     ):
4396         super(Enumerated, self).__init__(
4397             value, bounds, impl, expl, default, optional, _specs, _decoded,
4398         )
4399         if len(self.specs) == 0:
4400             raise ValueError("schema must be specified")
4401
4402     def _value_sanitize(self, value):
4403         if isinstance(value, self.__class__):
4404             value = value._value
4405         elif isinstance(value, integer_types):
4406             for _value in itervalues(self.specs):
4407                 if _value == value:
4408                     break
4409             else:
4410                 raise DecodeError(
4411                     "unknown integer value: %s" % value,
4412                     klass=self.__class__,
4413                 )
4414         elif isinstance(value, string_types):
4415             value = self.specs.get(value)
4416             if value is None:
4417                 raise ObjUnknown("integer value: %s" % value)
4418         else:
4419             raise InvalidValueType((self.__class__, int, str))
4420         return value
4421
4422     def __call__(
4423             self,
4424             value=None,
4425             impl=None,
4426             expl=None,
4427             default=None,
4428             optional=None,
4429             _specs=None,
4430     ):
4431         return self.__class__(
4432             value=value,
4433             impl=self.tag if impl is None else impl,
4434             expl=self._expl if expl is None else expl,
4435             default=self.default if default is None else default,
4436             optional=self.optional if optional is None else optional,
4437             _specs=self.specs,
4438         )
4439
4440
4441 def escape_control_unicode(c):
4442     if unicat(c)[0] == "C":
4443         c = repr(c).lstrip("u").strip("'")
4444     return c
4445
4446
4447 class CommonString(OctetString):
4448     """Common class for all strings
4449
4450     Everything resembles :py:class:`pyderasn.OctetString`, except
4451     ability to deal with unicode text strings.
4452
4453     >>> hexenc("привет Ð¼Ð¸Ñ€".encode("utf-8"))
4454     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4455     >>> UTF8String("привет Ð¼Ð¸Ñ€") == UTF8String(hexdec("d0...80"))
4456     True
4457     >>> s = UTF8String("привет Ð¼Ð¸Ñ€")
4458     UTF8String UTF8String Ð¿Ñ€Ð¸Ð²ÐµÑ‚ Ð¼Ð¸Ñ€
4459     >>> str(s)
4460     'привет Ð¼Ð¸Ñ€'
4461     >>> hexenc(bytes(s))
4462     'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4463
4464     >>> PrintableString("привет Ð¼Ð¸Ñ€")
4465     Traceback (most recent call last):
4466     pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4467
4468     >>> BMPString("ада", bounds=(2, 2))
4469     Traceback (most recent call last):
4470     pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4471     >>> s = BMPString("ад", bounds=(2, 2))
4472     >>> s.encoding
4473     'utf-16-be'
4474     >>> hexenc(bytes(s))
4475     '04300434'
4476
4477     .. list-table::
4478        :header-rows: 1
4479
4480        * - Class
4481          - Text Encoding
4482        * - :py:class:`pyderasn.UTF8String`
4483          - utf-8
4484        * - :py:class:`pyderasn.NumericString`
4485          - ascii
4486        * - :py:class:`pyderasn.PrintableString`
4487          - ascii
4488        * - :py:class:`pyderasn.TeletexString`
4489          - ascii
4490        * - :py:class:`pyderasn.T61String`
4491          - ascii
4492        * - :py:class:`pyderasn.VideotexString`
4493          - iso-8859-1
4494        * - :py:class:`pyderasn.IA5String`
4495          - ascii
4496        * - :py:class:`pyderasn.GraphicString`
4497          - iso-8859-1
4498        * - :py:class:`pyderasn.VisibleString`
4499          - ascii
4500        * - :py:class:`pyderasn.ISO646String`
4501          - ascii
4502        * - :py:class:`pyderasn.GeneralString`
4503          - iso-8859-1
4504        * - :py:class:`pyderasn.UniversalString`
4505          - utf-32-be
4506        * - :py:class:`pyderasn.BMPString`
4507          - utf-16-be
4508     """
4509     __slots__ = ()
4510
4511     def _value_sanitize(self, value):
4512         value_raw = None
4513         value_decoded = None
4514         if isinstance(value, self.__class__):
4515             value_raw = value._value
4516         elif value.__class__ == text_type:
4517             value_decoded = value
4518         elif value.__class__ == binary_type:
4519             value_raw = value
4520         else:
4521             raise InvalidValueType((self.__class__, text_type, binary_type))
4522         try:
4523             value_raw = (
4524                 value_decoded.encode(self.encoding)
4525                 if value_raw is None else value_raw
4526             )
4527             value_decoded = (
4528                 value_raw.decode(self.encoding)
4529                 if value_decoded is None else value_decoded
4530             )
4531         except (UnicodeEncodeError, UnicodeDecodeError) as err:
4532             raise DecodeError(str(err))
4533         if not self._bound_min <= len(value_decoded) <= self._bound_max:
4534             raise BoundsError(
4535                 self._bound_min,
4536                 len(value_decoded),
4537                 self._bound_max,
4538             )
4539         return value_raw
4540
4541     def __eq__(self, their):
4542         if their.__class__ == binary_type:
4543             return self._value == their
4544         if their.__class__ == text_type:
4545             return self._value == their.encode(self.encoding)
4546         if not isinstance(their, self.__class__):
4547             return False
4548         return (
4549             self._value == their._value and
4550             self.tag == their.tag and
4551             self._expl == their._expl
4552         )
4553
4554     def __unicode__(self):
4555         if self.ready:
4556             return self._value.decode(self.encoding)
4557         return text_type(self._value)
4558
4559     def __repr__(self):
4560         return pp_console_row(next(self.pps(no_unicode=PY2)))
4561
4562     def pps(self, decode_path=(), no_unicode=False):
4563         value = None
4564         if self.ready:
4565             value = (
4566                 hexenc(bytes(self)) if no_unicode else
4567                 "".join(escape_control_unicode(c) for c in self.__unicode__())
4568             )
4569         yield _pp(
4570             obj=self,
4571             asn1_type_name=self.asn1_type_name,
4572             obj_name=self.__class__.__name__,
4573             decode_path=decode_path,
4574             value=value,
4575             optional=self.optional,
4576             default=self == self.default,
4577             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4578             expl=None if self._expl is None else tag_decode(self._expl),
4579             offset=self.offset,
4580             tlen=self.tlen,
4581             llen=self.llen,
4582             vlen=self.vlen,
4583             expl_offset=self.expl_offset if self.expled else None,
4584             expl_tlen=self.expl_tlen if self.expled else None,
4585             expl_llen=self.expl_llen if self.expled else None,
4586             expl_vlen=self.expl_vlen if self.expled else None,
4587             expl_lenindef=self.expl_lenindef,
4588             ber_encoded=self.ber_encoded,
4589             bered=self.bered,
4590         )
4591         for pp in self.pps_lenindef(decode_path):
4592             yield pp
4593
4594
4595 class UTF8String(CommonString):
4596     __slots__ = ()
4597     tag_default = tag_encode(12)
4598     encoding = "utf-8"
4599     asn1_type_name = "UTF8String"
4600
4601
4602 class AllowableCharsMixin(object):
4603     @property
4604     def allowable_chars(self):
4605         if PY2:
4606             return self._allowable_chars
4607         return frozenset(six_unichr(c) for c in self._allowable_chars)
4608
4609
4610 class NumericString(AllowableCharsMixin, CommonString):
4611     """Numeric string
4612
4613     Its value is properly sanitized: only ASCII digits with spaces can
4614     be stored.
4615
4616     >>> NumericString().allowable_chars
4617     frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4618     """
4619     __slots__ = ()
4620     tag_default = tag_encode(18)
4621     encoding = "ascii"
4622     asn1_type_name = "NumericString"
4623     _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4624
4625     def _value_sanitize(self, value):
4626         value = super(NumericString, self)._value_sanitize(value)
4627         if not frozenset(value) <= self._allowable_chars:
4628             raise DecodeError("non-numeric value")
4629         return value
4630
4631
4632 PrintableStringState = namedtuple(
4633     "PrintableStringState",
4634     OctetStringState._fields + ("allowable_chars",),
4635     **NAMEDTUPLE_KWARGS
4636 )
4637
4638
4639 class PrintableString(AllowableCharsMixin, CommonString):
4640     """Printable string
4641
4642     Its value is properly sanitized: see X.680 41.4 table 10.
4643
4644     >>> PrintableString().allowable_chars
4645     frozenset([' ', "'", ..., 'z'])
4646     >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4647     PrintableString PrintableString foo*bar
4648     >>> obj.allow_asterisk, obj.allow_ampersand
4649     (True, False)
4650     """
4651     __slots__ = ()
4652     tag_default = tag_encode(19)
4653     encoding = "ascii"
4654     asn1_type_name = "PrintableString"
4655     _allowable_chars = frozenset(
4656         (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4657     )
4658     _asterisk = frozenset("*".encode("ascii"))
4659     _ampersand = frozenset("&".encode("ascii"))
4660
4661     def __init__(
4662             self,
4663             value=None,
4664             bounds=None,
4665             impl=None,
4666             expl=None,
4667             default=None,
4668             optional=False,
4669             _decoded=(0, 0, 0),
4670             ctx=None,
4671             allow_asterisk=False,
4672             allow_ampersand=False,
4673     ):
4674         """
4675         :param allow_asterisk: allow asterisk character
4676         :param allow_ampersand: allow ampersand character
4677         """
4678         if allow_asterisk:
4679             self._allowable_chars |= self._asterisk
4680         if allow_ampersand:
4681             self._allowable_chars |= self._ampersand
4682         super(PrintableString, self).__init__(
4683             value, bounds, impl, expl, default, optional, _decoded, ctx,
4684         )
4685
4686     @property
4687     def allow_asterisk(self):
4688         """Is asterisk character allowed?
4689         """
4690         return self._asterisk <= self._allowable_chars
4691
4692     @property
4693     def allow_ampersand(self):
4694         """Is ampersand character allowed?
4695         """
4696         return self._ampersand <= self._allowable_chars
4697
4698     def _value_sanitize(self, value):
4699         value = super(PrintableString, self)._value_sanitize(value)
4700         if not frozenset(value) <= self._allowable_chars:
4701             raise DecodeError("non-printable value")
4702         return value
4703
4704     def __getstate__(self):
4705         return PrintableStringState(
4706             *super(PrintableString, self).__getstate__(),
4707             **{"allowable_chars": self._allowable_chars}
4708         )
4709
4710     def __setstate__(self, state):
4711         super(PrintableString, self).__setstate__(state)
4712         self._allowable_chars = state.allowable_chars
4713
4714     def __call__(
4715             self,
4716             value=None,
4717             bounds=None,
4718             impl=None,
4719             expl=None,
4720             default=None,
4721             optional=None,
4722     ):
4723         return self.__class__(
4724             value=value,
4725             bounds=(
4726                 (self._bound_min, self._bound_max)
4727                 if bounds is None else bounds
4728             ),
4729             impl=self.tag if impl is None else impl,
4730             expl=self._expl if expl is None else expl,
4731             default=self.default if default is None else default,
4732             optional=self.optional if optional is None else optional,
4733             allow_asterisk=self.allow_asterisk,
4734             allow_ampersand=self.allow_ampersand,
4735         )
4736
4737
4738 class TeletexString(CommonString):
4739     __slots__ = ()
4740     tag_default = tag_encode(20)
4741     encoding = "ascii"
4742     asn1_type_name = "TeletexString"
4743
4744
4745 class T61String(TeletexString):
4746     __slots__ = ()
4747     asn1_type_name = "T61String"
4748
4749
4750 class VideotexString(CommonString):
4751     __slots__ = ()
4752     tag_default = tag_encode(21)
4753     encoding = "iso-8859-1"
4754     asn1_type_name = "VideotexString"
4755
4756
4757 class IA5String(CommonString):
4758     __slots__ = ()
4759     tag_default = tag_encode(22)
4760     encoding = "ascii"
4761     asn1_type_name = "IA5"
4762
4763
4764 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4765 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4766 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4767
4768
4769 class VisibleString(CommonString):
4770     __slots__ = ()
4771     tag_default = tag_encode(26)
4772     encoding = "ascii"
4773     asn1_type_name = "VisibleString"
4774
4775
4776 UTCTimeState = namedtuple(
4777     "UTCTimeState",
4778     OctetStringState._fields + ("ber_raw",),
4779     **NAMEDTUPLE_KWARGS
4780 )
4781
4782
4783 def str_to_time_fractions(value):
4784     v = pureint(value)
4785     year, v = (v // 10**10), (v % 10**10)
4786     month, v = (v // 10**8), (v % 10**8)
4787     day, v = (v // 10**6), (v % 10**6)
4788     hour, v = (v // 10**4), (v % 10**4)
4789     minute, second = (v // 100), (v % 100)
4790     return year, month, day, hour, minute, second
4791
4792
4793 class UTCTime(VisibleString):
4794     """``UTCTime`` datetime type
4795
4796     >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
4797     UTCTime UTCTime 2017-09-30T22:07:50
4798     >>> str(t)
4799     '170930220750Z'
4800     >>> bytes(t)
4801     b'170930220750Z'
4802     >>> t.todatetime()
4803     datetime.datetime(2017, 9, 30, 22, 7, 50)
4804     >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
4805     datetime.datetime(1957, 9, 30, 22, 7, 50)
4806
4807     If BER encoded value was met, then ``ber_raw`` attribute will hold
4808     its raw representation.
4809
4810     .. warning::
4811
4812        Pay attention that UTCTime can not hold full year, so all years
4813        having < 50 years are treated as 20xx, 19xx otherwise, according
4814        to X.509 recommendation.
4815
4816     .. warning::
4817
4818        No strict validation of UTC offsets are made, but very crude:
4819
4820        * minutes are not exceeding 60
4821        * offset value is not exceeding 14 hours
4822     """
4823     __slots__ = ("ber_raw",)
4824     tag_default = tag_encode(23)
4825     encoding = "ascii"
4826     asn1_type_name = "UTCTime"
4827     evgen_mode_skip_value = False
4828
4829     def __init__(
4830             self,
4831             value=None,
4832             impl=None,
4833             expl=None,
4834             default=None,
4835             optional=False,
4836             _decoded=(0, 0, 0),
4837             bounds=None,  # dummy argument, workability for OctetString.decode
4838             ctx=None,
4839     ):
4840         """
4841         :param value: set the value. Either datetime type, or
4842                       :py:class:`pyderasn.UTCTime` object
4843         :param bytes impl: override default tag with ``IMPLICIT`` one
4844         :param bytes expl: override default tag with ``EXPLICIT`` one
4845         :param default: set default value. Type same as in ``value``
4846         :param bool optional: is object ``OPTIONAL`` in sequence
4847         """
4848         super(UTCTime, self).__init__(
4849             None, None, impl, expl, None, optional, _decoded, ctx,
4850         )
4851         self._value = value
4852         self.ber_raw = None
4853         if value is not None:
4854             self._value, self.ber_raw = self._value_sanitize(value, ctx)
4855             self.ber_encoded = self.ber_raw is not None
4856         if default is not None:
4857             default, _ = self._value_sanitize(default)
4858             self.default = self.__class__(
4859                 value=default,
4860                 impl=self.tag,
4861                 expl=self._expl,
4862             )
4863             if self._value is None:
4864                 self._value = default
4865             optional = True
4866         self.optional = optional
4867
4868     def _strptime_bered(self, value):
4869         year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
4870         value = value[10:]
4871         if len(value) == 0:
4872             raise ValueError("no timezone")
4873         year += 2000 if year < 50 else 1900
4874         decoded = datetime(year, month, day, hour, minute)
4875         offset = 0
4876         if value[-1] == "Z":
4877             value = value[:-1]
4878         else:
4879             if len(value) < 5:
4880                 raise ValueError("invalid UTC offset")
4881             if value[-5] == "-":
4882                 sign = -1
4883             elif value[-5] == "+":
4884                 sign = 1
4885             else:
4886                 raise ValueError("invalid UTC offset")
4887             v = pureint(value[-4:])
4888             offset, v = (60 * (v % 100)), v // 100
4889             if offset >= 3600:
4890                 raise ValueError("invalid UTC offset minutes")
4891             offset += 3600 * v
4892             if offset > 14 * 3600:
4893                 raise ValueError("too big UTC offset")
4894             offset *= sign
4895             value = value[:-5]
4896         if len(value) == 0:
4897             return offset, decoded
4898         if len(value) != 2:
4899             raise ValueError("invalid UTC offset seconds")
4900         seconds = pureint(value)
4901         if seconds >= 60:
4902             raise ValueError("invalid seconds value")
4903         return offset, decoded + timedelta(seconds=seconds)
4904
4905     def _strptime(self, value):
4906         # datetime.strptime's format: %y%m%d%H%M%SZ
4907         if len(value) != LEN_YYMMDDHHMMSSZ:
4908             raise ValueError("invalid UTCTime length")
4909         if value[-1] != "Z":
4910             raise ValueError("non UTC timezone")
4911         year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
4912         year += 2000 if year < 50 else 1900
4913         return datetime(year, month, day, hour, minute, second)
4914
4915     def _dt_sanitize(self, value):
4916         if value.year < 1950 or value.year > 2049:
4917             raise ValueError("UTCTime can hold only 1950-2049 years")
4918         return value.replace(microsecond=0)
4919
4920     def _value_sanitize(self, value, ctx=None):
4921         if value.__class__ == binary_type:
4922             try:
4923                 value_decoded = value.decode("ascii")
4924             except (UnicodeEncodeError, UnicodeDecodeError) as err:
4925                 raise DecodeError("invalid UTCTime encoding: %r" % err)
4926             err = None
4927             try:
4928                 return self._strptime(value_decoded), None
4929             except (TypeError, ValueError) as _err:
4930                 err = _err
4931                 if (ctx is not None) and ctx.get("bered", False):
4932                     try:
4933                         offset, _value = self._strptime_bered(value_decoded)
4934                         _value = _value - timedelta(seconds=offset)
4935                         return self._dt_sanitize(_value), value
4936                     except (TypeError, ValueError, OverflowError) as _err:
4937                         err = _err
4938             raise DecodeError(
4939                 "invalid %s format: %r" % (self.asn1_type_name, err),
4940                 klass=self.__class__,
4941             )
4942         if isinstance(value, self.__class__):
4943             return value._value, None
4944         if value.__class__ == datetime:
4945             return self._dt_sanitize(value), None
4946         raise InvalidValueType((self.__class__, datetime))
4947
4948     def _pp_value(self):
4949         if self.ready:
4950             value = self._value.isoformat()
4951             if self.ber_encoded:
4952                 value += " (%s)" % self.ber_raw
4953             return value
4954
4955     def __unicode__(self):
4956         if self.ready:
4957             value = self._value.isoformat()
4958             if self.ber_encoded:
4959                 value += " (%s)" % self.ber_raw
4960             return value
4961         return text_type(self._pp_value())
4962
4963     def __getstate__(self):
4964         return UTCTimeState(
4965             *super(UTCTime, self).__getstate__(),
4966             **{"ber_raw": self.ber_raw}
4967         )
4968
4969     def __setstate__(self, state):
4970         super(UTCTime, self).__setstate__(state)
4971         self.ber_raw = state.ber_raw
4972
4973     def __bytes__(self):
4974         self._assert_ready()
4975         return self._encode_time()
4976
4977     def __eq__(self, their):
4978         if their.__class__ == binary_type:
4979             return self._encode_time() == their
4980         if their.__class__ == datetime:
4981             return self.todatetime() == their
4982         if not isinstance(their, self.__class__):
4983             return False
4984         return (
4985             self._value == their._value and
4986             self.tag == their.tag and
4987             self._expl == their._expl
4988         )
4989
4990     def _encode_time(self):
4991         return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
4992
4993     def _encode(self):
4994         self._assert_ready()
4995         value = self._encode_time()
4996         return b"".join((self.tag, len_encode(len(value)), value))
4997
4998     def _encode_cer(self, writer):
4999         write_full(writer, self._encode())
5000
5001     def todatetime(self):
5002         return self._value
5003
5004     def __repr__(self):
5005         return pp_console_row(next(self.pps()))
5006
5007     def pps(self, decode_path=()):
5008         yield _pp(
5009             obj=self,
5010             asn1_type_name=self.asn1_type_name,
5011             obj_name=self.__class__.__name__,
5012             decode_path=decode_path,
5013             value=self._pp_value(),
5014             optional=self.optional,
5015             default=self == self.default,
5016             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5017             expl=None if self._expl is None else tag_decode(self._expl),
5018             offset=self.offset,
5019             tlen=self.tlen,
5020             llen=self.llen,
5021             vlen=self.vlen,
5022             expl_offset=self.expl_offset if self.expled else None,
5023             expl_tlen=self.expl_tlen if self.expled else None,
5024             expl_llen=self.expl_llen if self.expled else None,
5025             expl_vlen=self.expl_vlen if self.expled else None,
5026             expl_lenindef=self.expl_lenindef,
5027             ber_encoded=self.ber_encoded,
5028             bered=self.bered,
5029         )
5030         for pp in self.pps_lenindef(decode_path):
5031             yield pp
5032
5033
5034 class GeneralizedTime(UTCTime):
5035     """``GeneralizedTime`` datetime type
5036
5037     This type is similar to :py:class:`pyderasn.UTCTime`.
5038
5039     >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5040     GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5041     >>> str(t)
5042     '20170930220750.000123Z'
5043     >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5044     GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5045
5046     .. warning::
5047
5048        Only microsecond fractions are supported in DER encoding.
5049        :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5050        higher precision values.
5051
5052     .. warning::
5053
5054        BER encoded data can loss information (accuracy) during decoding
5055        because of float transformations.
5056
5057     .. warning::
5058
5059        Local times (without explicit timezone specification) are treated
5060        as UTC one, no transformations are made.
5061
5062     .. warning::
5063
5064        Zero year is unsupported.
5065     """
5066     __slots__ = ()
5067     tag_default = tag_encode(24)
5068     asn1_type_name = "GeneralizedTime"
5069
5070     def _dt_sanitize(self, value):
5071         return value
5072
5073     def _strptime_bered(self, value):
5074         if len(value) < 4 + 3 * 2:
5075             raise ValueError("invalid GeneralizedTime")
5076         year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5077         decoded = datetime(year, month, day, hour)
5078         offset, value = 0, value[10:]
5079         if len(value) == 0:
5080             return offset, decoded
5081         if value[-1] == "Z":
5082             value = value[:-1]
5083         else:
5084             for char, sign in (("-", -1), ("+", 1)):
5085                 idx = value.rfind(char)
5086                 if idx == -1:
5087                     continue
5088                 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5089                 v = pureint(offset_raw)
5090                 if len(offset_raw) == 4:
5091                     offset, v = (60 * (v % 100)), v // 100
5092                     if offset >= 3600:
5093                         raise ValueError("invalid UTC offset minutes")
5094                 elif len(offset_raw) == 2:
5095                     pass
5096                 else:
5097                     raise ValueError("invalid UTC offset")
5098                 offset += 3600 * v
5099                 if offset > 14 * 3600:
5100                     raise ValueError("too big UTC offset")
5101                 offset *= sign
5102                 break
5103         if len(value) == 0:
5104             return offset, decoded
5105         if value[0] in DECIMAL_SIGNS:
5106             return offset, (
5107                 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5108             )
5109         if len(value) < 2:
5110             raise ValueError("stripped minutes")
5111         decoded += timedelta(seconds=60 * pureint(value[:2]))
5112         value = value[2:]
5113         if len(value) == 0:
5114             return offset, decoded
5115         if value[0] in DECIMAL_SIGNS:
5116             return offset, (
5117                 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5118             )
5119         if len(value) < 2:
5120             raise ValueError("stripped seconds")
5121         decoded += timedelta(seconds=pureint(value[:2]))
5122         value = value[2:]
5123         if len(value) == 0:
5124             return offset, decoded
5125         if value[0] not in DECIMAL_SIGNS:
5126             raise ValueError("invalid format after seconds")
5127         return offset, (
5128             decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5129         )
5130
5131     def _strptime(self, value):
5132         l = len(value)
5133         if l == LEN_YYYYMMDDHHMMSSZ:
5134             # datetime.strptime's format: %Y%m%d%H%M%SZ
5135             if value[-1] != "Z":
5136                 raise ValueError("non UTC timezone")
5137             return datetime(*str_to_time_fractions(value[:-1]))
5138         if l >= LEN_YYYYMMDDHHMMSSDMZ:
5139             # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5140             if value[-1] != "Z":
5141                 raise ValueError("non UTC timezone")
5142             if value[14] != ".":
5143                 raise ValueError("no fractions separator")
5144             us = value[15:-1]
5145             if us[-1] == "0":
5146                 raise ValueError("trailing zero")
5147             us_len = len(us)
5148             if us_len > 6:
5149                 raise ValueError("only microsecond fractions are supported")
5150             us = pureint(us + ("0" * (6 - us_len)))
5151             year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5152             return datetime(year, month, day, hour, minute, second, us)
5153         raise ValueError("invalid GeneralizedTime length")
5154
5155     def _encode_time(self):
5156         value = self._value
5157         encoded = value.strftime("%Y%m%d%H%M%S")
5158         if value.microsecond > 0:
5159             encoded += (".%06d" % value.microsecond).rstrip("0")
5160         return (encoded + "Z").encode("ascii")
5161
5162
5163 class GraphicString(CommonString):
5164     __slots__ = ()
5165     tag_default = tag_encode(25)
5166     encoding = "iso-8859-1"
5167     asn1_type_name = "GraphicString"
5168
5169
5170 class ISO646String(VisibleString):
5171     __slots__ = ()
5172     asn1_type_name = "ISO646String"
5173
5174
5175 class GeneralString(CommonString):
5176     __slots__ = ()
5177     tag_default = tag_encode(27)
5178     encoding = "iso-8859-1"
5179     asn1_type_name = "GeneralString"
5180
5181
5182 class UniversalString(CommonString):
5183     __slots__ = ()
5184     tag_default = tag_encode(28)
5185     encoding = "utf-32-be"
5186     asn1_type_name = "UniversalString"
5187
5188
5189 class BMPString(CommonString):
5190     __slots__ = ()
5191     tag_default = tag_encode(30)
5192     encoding = "utf-16-be"
5193     asn1_type_name = "BMPString"
5194
5195
5196 ChoiceState = namedtuple(
5197     "ChoiceState",
5198     BasicState._fields + ("specs", "value",),
5199     **NAMEDTUPLE_KWARGS
5200 )
5201
5202
5203 class Choice(Obj):
5204     """``CHOICE`` special type
5205
5206     ::
5207
5208         class GeneralName(Choice):
5209             schema = (
5210                 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5211                 ("dNSName", IA5String(impl=tag_ctxp(2))),
5212             )
5213
5214     >>> gn = GeneralName()
5215     GeneralName CHOICE
5216     >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5217     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5218     >>> gn["dNSName"] = IA5String("bar.baz")
5219     GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5220     >>> gn["rfc822Name"]
5221     None
5222     >>> gn["dNSName"]
5223     [2] IA5String IA5 bar.baz
5224     >>> gn.choice
5225     'dNSName'
5226     >>> gn.value == gn["dNSName"]
5227     True
5228     >>> gn.specs
5229     OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5230
5231     >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5232     GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5233     """
5234     __slots__ = ("specs",)
5235     tag_default = None
5236     asn1_type_name = "CHOICE"
5237
5238     def __init__(
5239             self,
5240             value=None,
5241             schema=None,
5242             impl=None,
5243             expl=None,
5244             default=None,
5245             optional=False,
5246             _decoded=(0, 0, 0),
5247     ):
5248         """
5249         :param value: set the value. Either ``(choice, value)`` tuple, or
5250                       :py:class:`pyderasn.Choice` object
5251         :param bytes impl: can not be set, do **not** use it
5252         :param bytes expl: override default tag with ``EXPLICIT`` one
5253         :param default: set default value. Type same as in ``value``
5254         :param bool optional: is object ``OPTIONAL`` in sequence
5255         """
5256         if impl is not None:
5257             raise ValueError("no implicit tag allowed for CHOICE")
5258         super(Choice, self).__init__(None, expl, default, optional, _decoded)
5259         if schema is None:
5260             schema = getattr(self, "schema", ())
5261         if len(schema) == 0:
5262             raise ValueError("schema must be specified")
5263         self.specs = (
5264             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5265         )
5266         self._value = None
5267         if value is not None:
5268             self._value = self._value_sanitize(value)
5269         if default is not None:
5270             default_value = self._value_sanitize(default)
5271             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5272             default_obj.specs = self.specs
5273             default_obj._value = default_value
5274             self.default = default_obj
5275             if value is None:
5276                 self._value = copy(default_obj._value)
5277         if self._expl is not None:
5278             tag_class, _, tag_num = tag_decode(self._expl)
5279             self._tag_order = (tag_class, tag_num)
5280
5281     def _value_sanitize(self, value):
5282         if (value.__class__ == tuple) and len(value) == 2:
5283             choice, obj = value
5284             spec = self.specs.get(choice)
5285             if spec is None:
5286                 raise ObjUnknown(choice)
5287             if not isinstance(obj, spec.__class__):
5288                 raise InvalidValueType((spec,))
5289             return (choice, spec(obj))
5290         if isinstance(value, self.__class__):
5291             return value._value
5292         raise InvalidValueType((self.__class__, tuple))
5293
5294     @property
5295     def ready(self):
5296         return self._value is not None and self._value[1].ready
5297
5298     @property
5299     def bered(self):
5300         return self.expl_lenindef or (
5301             (self._value is not None) and
5302             self._value[1].bered
5303         )
5304
5305     def __getstate__(self):
5306         return ChoiceState(
5307             __version__,
5308             self.tag,
5309             self._tag_order,
5310             self._expl,
5311             self.default,
5312             self.optional,
5313             self.offset,
5314             self.llen,
5315             self.vlen,
5316             self.expl_lenindef,
5317             self.lenindef,
5318             self.ber_encoded,
5319             self.specs,
5320             copy(self._value),
5321         )
5322
5323     def __setstate__(self, state):
5324         super(Choice, self).__setstate__(state)
5325         self.specs = state.specs
5326         self._value = state.value
5327
5328     def __eq__(self, their):
5329         if (their.__class__ == tuple) and len(their) == 2:
5330             return self._value == their
5331         if not isinstance(their, self.__class__):
5332             return False
5333         return (
5334             self.specs == their.specs and
5335             self._value == their._value
5336         )
5337
5338     def __call__(
5339             self,
5340             value=None,
5341             expl=None,
5342             default=None,
5343             optional=None,
5344     ):
5345         return self.__class__(
5346             value=value,
5347             schema=self.specs,
5348             expl=self._expl if expl is None else expl,
5349             default=self.default if default is None else default,
5350             optional=self.optional if optional is None else optional,
5351         )
5352
5353     @property
5354     def choice(self):
5355         """Name of the choice
5356         """
5357         self._assert_ready()
5358         return self._value[0]
5359
5360     @property
5361     def value(self):
5362         """Value of underlying choice
5363         """
5364         self._assert_ready()
5365         return self._value[1]
5366
5367     @property
5368     def tag_order(self):
5369         self._assert_ready()
5370         return self._value[1].tag_order if self._tag_order is None else self._tag_order
5371
5372     @property
5373     def tag_order_cer(self):
5374         return min(v.tag_order_cer for v in itervalues(self.specs))
5375
5376     def __getitem__(self, key):
5377         if key not in self.specs:
5378             raise ObjUnknown(key)
5379         if self._value is None:
5380             return None
5381         choice, value = self._value
5382         if choice != key:
5383             return None
5384         return value
5385
5386     def __setitem__(self, key, value):
5387         spec = self.specs.get(key)
5388         if spec is None:
5389             raise ObjUnknown(key)
5390         if not isinstance(value, spec.__class__):
5391             raise InvalidValueType((spec.__class__,))
5392         self._value = (key, spec(value))
5393
5394     @property
5395     def tlen(self):
5396         return 0
5397
5398     @property
5399     def decoded(self):
5400         return self._value[1].decoded if self.ready else False
5401
5402     def _encode(self):
5403         self._assert_ready()
5404         return self._value[1].encode()
5405
5406     def _encode_cer(self, writer):
5407         self._assert_ready()
5408         self._value[1].encode_cer(writer)
5409
5410     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5411         for choice, spec in iteritems(self.specs):
5412             sub_decode_path = decode_path + (choice,)
5413             try:
5414                 spec.decode(
5415                     tlv,
5416                     offset=offset,
5417                     leavemm=True,
5418                     decode_path=sub_decode_path,
5419                     ctx=ctx,
5420                     tag_only=True,
5421                     _ctx_immutable=False,
5422                 )
5423             except TagMismatch:
5424                 continue
5425             break
5426         else:
5427             raise TagMismatch(
5428                 klass=self.__class__,
5429                 decode_path=decode_path,
5430                 offset=offset,
5431             )
5432         if tag_only:  # pragma: no cover
5433             yield None
5434             return
5435         if evgen_mode:
5436             for _decode_path, value, tail in spec.decode_evgen(
5437                     tlv,
5438                     offset=offset,
5439                     leavemm=True,
5440                     decode_path=sub_decode_path,
5441                     ctx=ctx,
5442                     _ctx_immutable=False,
5443             ):
5444                 yield _decode_path, value, tail
5445         else:
5446             _, value, tail = next(spec.decode_evgen(
5447                 tlv,
5448                 offset=offset,
5449                 leavemm=True,
5450                 decode_path=sub_decode_path,
5451                 ctx=ctx,
5452                 _ctx_immutable=False,
5453                 _evgen_mode=False,
5454             ))
5455         obj = self.__class__(
5456             schema=self.specs,
5457             expl=self._expl,
5458             default=self.default,
5459             optional=self.optional,
5460             _decoded=(offset, 0, value.fulllen),
5461         )
5462         obj._value = (choice, value)
5463         yield decode_path, obj, tail
5464
5465     def __repr__(self):
5466         value = pp_console_row(next(self.pps()))
5467         if self.ready:
5468             value = "%s[%r]" % (value, self.value)
5469         return value
5470
5471     def pps(self, decode_path=()):
5472         yield _pp(
5473             obj=self,
5474             asn1_type_name=self.asn1_type_name,
5475             obj_name=self.__class__.__name__,
5476             decode_path=decode_path,
5477             value=self.choice if self.ready else None,
5478             optional=self.optional,
5479             default=self == self.default,
5480             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5481             expl=None if self._expl is None else tag_decode(self._expl),
5482             offset=self.offset,
5483             tlen=self.tlen,
5484             llen=self.llen,
5485             vlen=self.vlen,
5486             expl_lenindef=self.expl_lenindef,
5487             bered=self.bered,
5488         )
5489         if self.ready:
5490             yield self.value.pps(decode_path=decode_path + (self.choice,))
5491         for pp in self.pps_lenindef(decode_path):
5492             yield pp
5493
5494
5495 class PrimitiveTypes(Choice):
5496     """Predefined ``CHOICE`` for all generic primitive types
5497
5498     It could be useful for general decoding of some unspecified values:
5499
5500     >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5501     OCTET STRING 3 bytes 666f6f
5502     >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5503     INTEGER 1193046
5504     """
5505     __slots__ = ()
5506     schema = tuple((klass.__name__, klass()) for klass in (
5507         Boolean,
5508         Integer,
5509         BitString,
5510         OctetString,
5511         Null,
5512         ObjectIdentifier,
5513         UTF8String,
5514         NumericString,
5515         PrintableString,
5516         TeletexString,
5517         VideotexString,
5518         IA5String,
5519         UTCTime,
5520         GeneralizedTime,
5521         GraphicString,
5522         VisibleString,
5523         ISO646String,
5524         GeneralString,
5525         UniversalString,
5526         BMPString,
5527     ))
5528
5529
5530 AnyState = namedtuple(
5531     "AnyState",
5532     BasicState._fields + ("value", "defined"),
5533     **NAMEDTUPLE_KWARGS
5534 )
5535
5536
5537 class Any(Obj):
5538     """``ANY`` special type
5539
5540     >>> Any(Integer(-123))
5541     ANY INTEGER -123 (0X:7B)
5542     >>> a = Any(OctetString(b"hello world").encode())
5543     ANY 040b68656c6c6f20776f726c64
5544     >>> hexenc(bytes(a))
5545     b'0x040x0bhello world'
5546     """
5547     __slots__ = ("defined",)
5548     tag_default = tag_encode(0)
5549     asn1_type_name = "ANY"
5550
5551     def __init__(
5552             self,
5553             value=None,
5554             expl=None,
5555             optional=False,
5556             _decoded=(0, 0, 0),
5557     ):
5558         """
5559         :param value: set the value. Either any kind of pyderasn's
5560                       **ready** object, or bytes. Pay attention that
5561                       **no** validation is performed if raw binary value
5562                       is valid TLV, except just tag decoding
5563         :param bytes expl: override default tag with ``EXPLICIT`` one
5564         :param bool optional: is object ``OPTIONAL`` in sequence
5565         """
5566         super(Any, self).__init__(None, expl, None, optional, _decoded)
5567         if value is None:
5568             self._value = None
5569         else:
5570             value = self._value_sanitize(value)
5571             self._value = value
5572             if self._expl is None:
5573                 if value.__class__ == binary_type:
5574                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5575                 else:
5576                     tag_class, tag_num = value.tag_order
5577             else:
5578                 tag_class, _, tag_num = tag_decode(self._expl)
5579             self._tag_order = (tag_class, tag_num)
5580         self.defined = None
5581
5582     def _value_sanitize(self, value):
5583         if value.__class__ == binary_type:
5584             if len(value) == 0:
5585                 raise ValueError("Any value can not be empty")
5586             return value
5587         if isinstance(value, self.__class__):
5588             return value._value
5589         if not isinstance(value, Obj):
5590             raise InvalidValueType((self.__class__, Obj, binary_type))
5591         return value
5592
5593     @property
5594     def ready(self):
5595         return self._value is not None
5596
5597     @property
5598     def tag_order(self):
5599         self._assert_ready()
5600         return self._tag_order
5601
5602     @property
5603     def bered(self):
5604         if self.expl_lenindef or self.lenindef:
5605             return True
5606         if self.defined is None:
5607             return False
5608         return self.defined[1].bered
5609
5610     def __getstate__(self):
5611         return AnyState(
5612             __version__,
5613             self.tag,
5614             self._tag_order,
5615             self._expl,
5616             None,
5617             self.optional,
5618             self.offset,
5619             self.llen,
5620             self.vlen,
5621             self.expl_lenindef,
5622             self.lenindef,
5623             self.ber_encoded,
5624             self._value,
5625             self.defined,
5626         )
5627
5628     def __setstate__(self, state):
5629         super(Any, self).__setstate__(state)
5630         self._value = state.value
5631         self.defined = state.defined
5632
5633     def __eq__(self, their):
5634         if their.__class__ == binary_type:
5635             if self._value.__class__ == binary_type:
5636                 return self._value == their
5637             return self._value.encode() == their
5638         if issubclass(their.__class__, Any):
5639             if self.ready and their.ready:
5640                 return bytes(self) == bytes(their)
5641             return self.ready == their.ready
5642         return False
5643
5644     def __call__(
5645             self,
5646             value=None,
5647             expl=None,
5648             optional=None,
5649     ):
5650         return self.__class__(
5651             value=value,
5652             expl=self._expl if expl is None else expl,
5653             optional=self.optional if optional is None else optional,
5654         )
5655
5656     def __bytes__(self):
5657         self._assert_ready()
5658         value = self._value
5659         if value.__class__ == binary_type:
5660             return value
5661         return self._value.encode()
5662
5663     @property
5664     def tlen(self):
5665         return 0
5666
5667     def _encode(self):
5668         self._assert_ready()
5669         value = self._value
5670         if value.__class__ == binary_type:
5671             return value
5672         return value.encode()
5673
5674     def _encode_cer(self, writer):
5675         self._assert_ready()
5676         value = self._value
5677         if value.__class__ == binary_type:
5678             write_full(writer, value)
5679         else:
5680             value.encode_cer(writer)
5681
5682     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5683         try:
5684             t, tlen, lv = tag_strip(tlv)
5685         except DecodeError as err:
5686             raise err.__class__(
5687                 msg=err.msg,
5688                 klass=self.__class__,
5689                 decode_path=decode_path,
5690                 offset=offset,
5691             )
5692         try:
5693             l, llen, v = len_decode(lv)
5694         except LenIndefForm as err:
5695             if not ctx.get("bered", False):
5696                 raise err.__class__(
5697                     msg=err.msg,
5698                     klass=self.__class__,
5699                     decode_path=decode_path,
5700                     offset=offset,
5701                 )
5702             llen, vlen, v = 1, 0, lv[1:]
5703             sub_offset = offset + tlen + llen
5704             chunk_i = 0
5705             while v[:EOC_LEN].tobytes() != EOC:
5706                 chunk, v = Any().decode(
5707                     v,
5708                     offset=sub_offset,
5709                     decode_path=decode_path + (str(chunk_i),),
5710                     leavemm=True,
5711                     ctx=ctx,
5712                     _ctx_immutable=False,
5713                 )
5714                 vlen += chunk.tlvlen
5715                 sub_offset += chunk.tlvlen
5716                 chunk_i += 1
5717             tlvlen = tlen + llen + vlen + EOC_LEN
5718             obj = self.__class__(
5719                 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5720                 expl=self._expl,
5721                 optional=self.optional,
5722                 _decoded=(offset, 0, tlvlen),
5723             )
5724             obj.lenindef = True
5725             obj.tag = t.tobytes()
5726             yield decode_path, obj, v[EOC_LEN:]
5727             return
5728         except DecodeError as err:
5729             raise err.__class__(
5730                 msg=err.msg,
5731                 klass=self.__class__,
5732                 decode_path=decode_path,
5733                 offset=offset,
5734             )
5735         if l > len(v):
5736             raise NotEnoughData(
5737                 "encoded length is longer than data",
5738                 klass=self.__class__,
5739                 decode_path=decode_path,
5740                 offset=offset,
5741             )
5742         tlvlen = tlen + llen + l
5743         v, tail = tlv[:tlvlen], v[l:]
5744         obj = self.__class__(
5745             value=None if evgen_mode else v.tobytes(),
5746             expl=self._expl,
5747             optional=self.optional,
5748             _decoded=(offset, 0, tlvlen),
5749         )
5750         obj.tag = t.tobytes()
5751         yield decode_path, obj, tail
5752
5753     def __repr__(self):
5754         return pp_console_row(next(self.pps()))
5755
5756     def pps(self, decode_path=()):
5757         value = self._value
5758         if value is None:
5759             pass
5760         elif value.__class__ == binary_type:
5761             value = None
5762         else:
5763             value = repr(value)
5764         yield _pp(
5765             obj=self,
5766             asn1_type_name=self.asn1_type_name,
5767             obj_name=self.__class__.__name__,
5768             decode_path=decode_path,
5769             value=value,
5770             blob=self._value if self._value.__class__ == binary_type else None,
5771             optional=self.optional,
5772             default=self == self.default,
5773             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5774             expl=None if self._expl is None else tag_decode(self._expl),
5775             offset=self.offset,
5776             tlen=self.tlen,
5777             llen=self.llen,
5778             vlen=self.vlen,
5779             expl_offset=self.expl_offset if self.expled else None,
5780             expl_tlen=self.expl_tlen if self.expled else None,
5781             expl_llen=self.expl_llen if self.expled else None,
5782             expl_vlen=self.expl_vlen if self.expled else None,
5783             expl_lenindef=self.expl_lenindef,
5784             lenindef=self.lenindef,
5785             bered=self.bered,
5786         )
5787         defined_by, defined = self.defined or (None, None)
5788         if defined_by is not None:
5789             yield defined.pps(
5790                 decode_path=decode_path + (DecodePathDefBy(defined_by),)
5791             )
5792         for pp in self.pps_lenindef(decode_path):
5793             yield pp
5794
5795
5796 ########################################################################
5797 # ASN.1 constructed types
5798 ########################################################################
5799
5800 def get_def_by_path(defines_by_path, sub_decode_path):
5801     """Get define by decode path
5802     """
5803     for path, define in defines_by_path:
5804         if len(path) != len(sub_decode_path):
5805             continue
5806         for p1, p2 in zip(path, sub_decode_path):
5807             if (not p1 is any) and (p1 != p2):
5808                 break
5809         else:
5810             return define
5811
5812
5813 def abs_decode_path(decode_path, rel_path):
5814     """Create an absolute decode path from current and relative ones
5815
5816     :param decode_path: current decode path, starting point. Tuple of strings
5817     :param rel_path: relative path to ``decode_path``. Tuple of strings.
5818                      If first tuple's element is "/", then treat it as
5819                      an absolute path, ignoring ``decode_path`` as
5820                      starting point. Also this tuple can contain ".."
5821                      elements, stripping the leading element from
5822                      ``decode_path``
5823
5824     >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
5825     ("foo", "bar", "baz", "whatever")
5826     >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
5827     ("foo", "whatever")
5828     >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
5829     ("baz", "whatever")
5830     """
5831     if rel_path[0] == "/":
5832         return rel_path[1:]
5833     if rel_path[0] == "..":
5834         return abs_decode_path(decode_path[:-1], rel_path[1:])
5835     return decode_path + rel_path
5836
5837
5838 SequenceState = namedtuple(
5839     "SequenceState",
5840     BasicState._fields + ("specs", "value",),
5841     **NAMEDTUPLE_KWARGS
5842 )
5843
5844
5845 class Sequence(Obj):
5846     """``SEQUENCE`` structure type
5847
5848     You have to make specification of sequence::
5849
5850         class Extension(Sequence):
5851             schema = (
5852                 ("extnID", ObjectIdentifier()),
5853                 ("critical", Boolean(default=False)),
5854                 ("extnValue", OctetString()),
5855             )
5856
5857     Then, you can work with it as with dictionary.
5858
5859     >>> ext = Extension()
5860     >>> Extension().specs
5861     OrderedDict([
5862         ('extnID', OBJECT IDENTIFIER),
5863         ('critical', BOOLEAN False OPTIONAL DEFAULT),
5864         ('extnValue', OCTET STRING),
5865     ])
5866     >>> ext["extnID"] = "1.2.3"
5867     Traceback (most recent call last):
5868     pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
5869     >>> ext["extnID"] = ObjectIdentifier("1.2.3")
5870
5871     You can determine if sequence is ready to be encoded:
5872
5873     >>> ext.ready
5874     False
5875     >>> ext.encode()
5876     Traceback (most recent call last):
5877     pyderasn.ObjNotReady: object is not ready: extnValue
5878     >>> ext["extnValue"] = OctetString(b"foobar")
5879     >>> ext.ready
5880     True
5881
5882     Value you want to assign, must have the same **type** as in
5883     corresponding specification, but it can have different tags,
5884     optional/default attributes -- they will be taken from specification
5885     automatically::
5886
5887         class TBSCertificate(Sequence):
5888             schema = (
5889                 ("version", Version(expl=tag_ctxc(0), default="v1")),
5890             [...]
5891
5892     >>> tbs = TBSCertificate()
5893     >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
5894
5895     Assign ``None`` to remove value from sequence.
5896
5897     You can set values in Sequence during its initialization:
5898
5899     >>> AlgorithmIdentifier((
5900         ("algorithm", ObjectIdentifier("1.2.3")),
5901         ("parameters", Any(Null()))
5902     ))
5903     AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
5904
5905     You can determine if value exists/set in the sequence and take its value:
5906
5907     >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
5908     (True, True, False)
5909     >>> ext["extnID"]
5910     OBJECT IDENTIFIER 1.2.3
5911
5912     But pay attention that if value has default, then it won't be (not
5913     in) in the sequence (because ``DEFAULT`` must not be encoded in
5914     DER), but you can read its value:
5915
5916     >>> "critical" in ext, ext["critical"]
5917     (False, BOOLEAN False)
5918     >>> ext["critical"] = Boolean(True)
5919     >>> "critical" in ext, ext["critical"]
5920     (True, BOOLEAN True)
5921
5922     All defaulted values are always optional.
5923
5924     .. _allow_default_values_ctx:
5925
5926     DER prohibits default value encoding and will raise an error if
5927     default value is unexpectedly met during decode.
5928     If :ref:`bered <bered_ctx>` context option is set, then no error
5929     will be raised, but ``bered`` attribute set. You can disable strict
5930     defaulted values existence validation by setting
5931     ``"allow_default_values": True`` :ref:`context <ctx>` option.
5932
5933     .. warning::
5934
5935        Check for default value existence is not performed in
5936        ``evgen_mode``, because previously decoded values are not stored
5937        in memory, to be able to compare them.
5938
5939     Two sequences are equal if they have equal specification (schema),
5940     implicit/explicit tagging and the same values.
5941     """
5942     __slots__ = ("specs",)
5943     tag_default = tag_encode(form=TagFormConstructed, num=16)
5944     asn1_type_name = "SEQUENCE"
5945
5946     def __init__(
5947             self,
5948             value=None,
5949             schema=None,
5950             impl=None,
5951             expl=None,
5952             default=None,
5953             optional=False,
5954             _decoded=(0, 0, 0),
5955     ):
5956         super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
5957         if schema is None:
5958             schema = getattr(self, "schema", ())
5959         self.specs = (
5960             schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5961         )
5962         self._value = {}
5963         if value is not None:
5964             if issubclass(value.__class__, Sequence):
5965                 self._value = value._value
5966             elif hasattr(value, "__iter__"):
5967                 for seq_key, seq_value in value:
5968                     self[seq_key] = seq_value
5969             else:
5970                 raise InvalidValueType((Sequence,))
5971         if default is not None:
5972             if not issubclass(default.__class__, Sequence):
5973                 raise InvalidValueType((Sequence,))
5974             default_value = default._value
5975             default_obj = self.__class__(impl=self.tag, expl=self._expl)
5976             default_obj.specs = self.specs
5977             default_obj._value = default_value
5978             self.default = default_obj
5979             if value is None:
5980                 self._value = copy(default_obj._value)
5981
5982     @property
5983     def ready(self):
5984         for name, spec in iteritems(self.specs):
5985             value = self._value.get(name)
5986             if value is None:
5987                 if spec.optional:
5988                     continue
5989                 return False
5990             if not value.ready:
5991                 return False
5992         return True
5993
5994     @property
5995     def bered(self):
5996         if self.expl_lenindef or self.lenindef or self.ber_encoded:
5997             return True
5998         return any(value.bered for value in itervalues(self._value))
5999
6000     def __getstate__(self):
6001         return SequenceState(
6002             __version__,
6003             self.tag,
6004             self._tag_order,
6005             self._expl,
6006             self.default,
6007             self.optional,
6008             self.offset,
6009             self.llen,
6010             self.vlen,
6011             self.expl_lenindef,
6012             self.lenindef,
6013             self.ber_encoded,
6014             self.specs,
6015             {k: copy(v) for k, v in iteritems(self._value)},
6016         )
6017
6018     def __setstate__(self, state):
6019         super(Sequence, self).__setstate__(state)
6020         self.specs = state.specs
6021         self._value = state.value
6022
6023     def __eq__(self, their):
6024         if not isinstance(their, self.__class__):
6025             return False
6026         return (
6027             self.specs == their.specs and
6028             self.tag == their.tag and
6029             self._expl == their._expl and
6030             self._value == their._value
6031         )
6032
6033     def __call__(
6034             self,
6035             value=None,
6036             impl=None,
6037             expl=None,
6038             default=None,
6039             optional=None,
6040     ):
6041         return self.__class__(
6042             value=value,
6043             schema=self.specs,
6044             impl=self.tag if impl is None else impl,
6045             expl=self._expl if expl is None else expl,
6046             default=self.default if default is None else default,
6047             optional=self.optional if optional is None else optional,
6048         )
6049
6050     def __contains__(self, key):
6051         return key in self._value
6052
6053     def __setitem__(self, key, value):
6054         spec = self.specs.get(key)
6055         if spec is None:
6056             raise ObjUnknown(key)
6057         if value is None:
6058             self._value.pop(key, None)
6059             return
6060         if not isinstance(value, spec.__class__):
6061             raise InvalidValueType((spec.__class__,))
6062         value = spec(value=value)
6063         if spec.default is not None and value == spec.default:
6064             self._value.pop(key, None)
6065             return
6066         self._value[key] = value
6067
6068     def __getitem__(self, key):
6069         value = self._value.get(key)
6070         if value is not None:
6071             return value
6072         spec = self.specs.get(key)
6073         if spec is None:
6074             raise ObjUnknown(key)
6075         if spec.default is not None:
6076             return spec.default
6077         return None
6078
6079     def _values_for_encoding(self):
6080         for name, spec in iteritems(self.specs):
6081             value = self._value.get(name)
6082             if value is None:
6083                 if spec.optional:
6084                     continue
6085                 raise ObjNotReady(name)
6086             yield value
6087
6088     def _encode(self):
6089         v = b"".join(v.encode() for v in self._values_for_encoding())
6090         return b"".join((self.tag, len_encode(len(v)), v))
6091
6092     def _encode_cer(self, writer):
6093         write_full(writer, self.tag + LENINDEF)
6094         for v in self._values_for_encoding():
6095             v.encode_cer(writer)
6096         write_full(writer, EOC)
6097
6098     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6099         try:
6100             t, tlen, lv = tag_strip(tlv)
6101         except DecodeError as err:
6102             raise err.__class__(
6103                 msg=err.msg,
6104                 klass=self.__class__,
6105                 decode_path=decode_path,
6106                 offset=offset,
6107             )
6108         if t != self.tag:
6109             raise TagMismatch(
6110                 klass=self.__class__,
6111                 decode_path=decode_path,
6112                 offset=offset,
6113             )
6114         if tag_only:  # pragma: no cover
6115             yield None
6116             return
6117         lenindef = False
6118         ctx_bered = ctx.get("bered", False)
6119         try:
6120             l, llen, v = len_decode(lv)
6121         except LenIndefForm as err:
6122             if not ctx_bered:
6123                 raise err.__class__(
6124                     msg=err.msg,
6125                     klass=self.__class__,
6126                     decode_path=decode_path,
6127                     offset=offset,
6128                 )
6129             l, llen, v = 0, 1, lv[1:]
6130             lenindef = True
6131         except DecodeError as err:
6132             raise err.__class__(
6133                 msg=err.msg,
6134                 klass=self.__class__,
6135                 decode_path=decode_path,
6136                 offset=offset,
6137             )
6138         if l > len(v):
6139             raise NotEnoughData(
6140                 "encoded length is longer than data",
6141                 klass=self.__class__,
6142                 decode_path=decode_path,
6143                 offset=offset,
6144             )
6145         if not lenindef:
6146             v, tail = v[:l], v[l:]
6147         vlen = 0
6148         sub_offset = offset + tlen + llen
6149         values = {}
6150         ber_encoded = False
6151         ctx_allow_default_values = ctx.get("allow_default_values", False)
6152         for name, spec in iteritems(self.specs):
6153             if spec.optional and (
6154                     (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6155                     len(v) == 0
6156             ):
6157                 continue
6158             sub_decode_path = decode_path + (name,)
6159             try:
6160                 if evgen_mode:
6161                     for _decode_path, value, v_tail in spec.decode_evgen(
6162                             v,
6163                             sub_offset,
6164                             leavemm=True,
6165                             decode_path=sub_decode_path,
6166                             ctx=ctx,
6167                             _ctx_immutable=False,
6168                     ):
6169                         yield _decode_path, value, v_tail
6170                 else:
6171                     _, value, v_tail = next(spec.decode_evgen(
6172                         v,
6173                         sub_offset,
6174                         leavemm=True,
6175                         decode_path=sub_decode_path,
6176                         ctx=ctx,
6177                         _ctx_immutable=False,
6178                         _evgen_mode=False,
6179                     ))
6180             except TagMismatch as err:
6181                 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6182                     continue
6183                 raise
6184
6185             defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6186             if not evgen_mode and defined is not None:
6187                 defined_by, defined_spec = defined
6188                 if issubclass(value.__class__, SequenceOf):
6189                     for i, _value in enumerate(value):
6190                         sub_sub_decode_path = sub_decode_path + (
6191                             str(i),
6192                             DecodePathDefBy(defined_by),
6193                         )
6194                         defined_value, defined_tail = defined_spec.decode(
6195                             memoryview(bytes(_value)),
6196                             sub_offset + (
6197                                 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6198                                 if value.expled else (value.tlen + value.llen)
6199                             ),
6200                             leavemm=True,
6201                             decode_path=sub_sub_decode_path,
6202                             ctx=ctx,
6203                             _ctx_immutable=False,
6204                         )
6205                         if len(defined_tail) > 0:
6206                             raise DecodeError(
6207                                 "remaining data",
6208                                 klass=self.__class__,
6209                                 decode_path=sub_sub_decode_path,
6210                                 offset=offset,
6211                             )
6212                         _value.defined = (defined_by, defined_value)
6213                 else:
6214                     defined_value, defined_tail = defined_spec.decode(
6215                         memoryview(bytes(value)),
6216                         sub_offset + (
6217                             (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6218                             if value.expled else (value.tlen + value.llen)
6219                         ),
6220                         leavemm=True,
6221                         decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6222                         ctx=ctx,
6223                         _ctx_immutable=False,
6224                     )
6225                     if len(defined_tail) > 0:
6226                         raise DecodeError(
6227                             "remaining data",
6228                             klass=self.__class__,
6229                             decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6230                             offset=offset,
6231                         )
6232                     value.defined = (defined_by, defined_value)
6233
6234             value_len = value.fulllen
6235             vlen += value_len
6236             sub_offset += value_len
6237             v = v_tail
6238             if not evgen_mode:
6239                 if spec.default is not None and value == spec.default:
6240                     # This will not work in evgen_mode
6241                     if ctx_bered or ctx_allow_default_values:
6242                         ber_encoded = True
6243                     else:
6244                         raise DecodeError(
6245                             "DEFAULT value met",
6246                             klass=self.__class__,
6247                             decode_path=sub_decode_path,
6248                             offset=sub_offset,
6249                         )
6250                 values[name] = value
6251                 spec_defines = getattr(spec, "defines", ())
6252                 if len(spec_defines) == 0:
6253                     defines_by_path = ctx.get("defines_by_path", ())
6254                     if len(defines_by_path) > 0:
6255                         spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6256                 if spec_defines is not None and len(spec_defines) > 0:
6257                     for rel_path, schema in spec_defines:
6258                         defined = schema.get(value, None)
6259                         if defined is not None:
6260                             ctx.setdefault("_defines", []).append((
6261                                 abs_decode_path(sub_decode_path[:-1], rel_path),
6262                                 (value, defined),
6263                             ))
6264         if lenindef:
6265             if v[:EOC_LEN].tobytes() != EOC:
6266                 raise DecodeError(
6267                     "no EOC",
6268                     klass=self.__class__,
6269                     decode_path=decode_path,
6270                     offset=offset,
6271                 )
6272             tail = v[EOC_LEN:]
6273             vlen += EOC_LEN
6274         elif len(v) > 0:
6275             raise DecodeError(
6276                 "remaining data",
6277                 klass=self.__class__,
6278                 decode_path=decode_path,
6279                 offset=offset,
6280             )
6281         obj = self.__class__(
6282             schema=self.specs,
6283             impl=self.tag,
6284             expl=self._expl,
6285             default=self.default,
6286             optional=self.optional,
6287             _decoded=(offset, llen, vlen),
6288         )
6289         obj._value = values
6290         obj.lenindef = lenindef
6291         obj.ber_encoded = ber_encoded
6292         yield decode_path, obj, tail
6293
6294     def __repr__(self):
6295         value = pp_console_row(next(self.pps()))
6296         cols = []
6297         for name in self.specs:
6298             _value = self._value.get(name)
6299             if _value is None:
6300                 continue
6301             cols.append("%s: %s" % (name, repr(_value)))
6302         return "%s[%s]" % (value, "; ".join(cols))
6303
6304     def pps(self, decode_path=()):
6305         yield _pp(
6306             obj=self,
6307             asn1_type_name=self.asn1_type_name,
6308             obj_name=self.__class__.__name__,
6309             decode_path=decode_path,
6310             optional=self.optional,
6311             default=self == self.default,
6312             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6313             expl=None if self._expl is None else tag_decode(self._expl),
6314             offset=self.offset,
6315             tlen=self.tlen,
6316             llen=self.llen,
6317             vlen=self.vlen,
6318             expl_offset=self.expl_offset if self.expled else None,
6319             expl_tlen=self.expl_tlen if self.expled else None,
6320             expl_llen=self.expl_llen if self.expled else None,
6321             expl_vlen=self.expl_vlen if self.expled else None,
6322             expl_lenindef=self.expl_lenindef,
6323             lenindef=self.lenindef,
6324             ber_encoded=self.ber_encoded,
6325             bered=self.bered,
6326         )
6327         for name in self.specs:
6328             value = self._value.get(name)
6329             if value is None:
6330                 continue
6331             yield value.pps(decode_path=decode_path + (name,))
6332         for pp in self.pps_lenindef(decode_path):
6333             yield pp
6334
6335
6336 class Set(Sequence):
6337     """``SET`` structure type
6338
6339     Its usage is identical to :py:class:`pyderasn.Sequence`.
6340
6341     .. _allow_unordered_set_ctx:
6342
6343     DER prohibits unordered values encoding and will raise an error
6344     during decode. If :ref:`bered <bered_ctx>` context option is set,
6345     then no error will occur. Also you can disable strict values
6346     ordering check by setting ``"allow_unordered_set": True``
6347     :ref:`context <ctx>` option.
6348     """
6349     __slots__ = ()
6350     tag_default = tag_encode(form=TagFormConstructed, num=17)
6351     asn1_type_name = "SET"
6352
6353     def _encode(self):
6354         v = b"".join(value.encode() for value in sorted(
6355             self._values_for_encoding(),
6356             key=attrgetter("tag_order"),
6357         ))
6358         return b"".join((self.tag, len_encode(len(v)), v))
6359
6360     def _encode_cer(self, writer):
6361         write_full(writer, self.tag + LENINDEF)
6362         for v in sorted(
6363                 self._values_for_encoding(),
6364                 key=attrgetter("tag_order_cer"),
6365         ):
6366             v.encode_cer(writer)
6367         write_full(writer, EOC)
6368
6369     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6370         try:
6371             t, tlen, lv = tag_strip(tlv)
6372         except DecodeError as err:
6373             raise err.__class__(
6374                 msg=err.msg,
6375                 klass=self.__class__,
6376                 decode_path=decode_path,
6377                 offset=offset,
6378             )
6379         if t != self.tag:
6380             raise TagMismatch(
6381                 klass=self.__class__,
6382                 decode_path=decode_path,
6383                 offset=offset,
6384             )
6385         if tag_only:
6386             yield None
6387             return
6388         lenindef = False
6389         ctx_bered = ctx.get("bered", False)
6390         try:
6391             l, llen, v = len_decode(lv)
6392         except LenIndefForm as err:
6393             if not ctx_bered:
6394                 raise err.__class__(
6395                     msg=err.msg,
6396                     klass=self.__class__,
6397                     decode_path=decode_path,
6398                     offset=offset,
6399                 )
6400             l, llen, v = 0, 1, lv[1:]
6401             lenindef = True
6402         except DecodeError as err:
6403             raise err.__class__(
6404                 msg=err.msg,
6405                 klass=self.__class__,
6406                 decode_path=decode_path,
6407                 offset=offset,
6408             )
6409         if l > len(v):
6410             raise NotEnoughData(
6411                 "encoded length is longer than data",
6412                 klass=self.__class__,
6413                 offset=offset,
6414             )
6415         if not lenindef:
6416             v, tail = v[:l], v[l:]
6417         vlen = 0
6418         sub_offset = offset + tlen + llen
6419         values = {}
6420         ber_encoded = False
6421         ctx_allow_default_values = ctx.get("allow_default_values", False)
6422         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6423         tag_order_prev = (0, 0)
6424         _specs_items = copy(self.specs)
6425
6426         while len(v) > 0:
6427             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6428                 break
6429             for name, spec in iteritems(_specs_items):
6430                 sub_decode_path = decode_path + (name,)
6431                 try:
6432                     spec.decode(
6433                         v,
6434                         sub_offset,
6435                         leavemm=True,
6436                         decode_path=sub_decode_path,
6437                         ctx=ctx,
6438                         tag_only=True,
6439                         _ctx_immutable=False,
6440                     )
6441                 except TagMismatch:
6442                     continue
6443                 break
6444             else:
6445                 raise TagMismatch(
6446                     klass=self.__class__,
6447                     decode_path=decode_path,
6448                     offset=offset,
6449                 )
6450             if evgen_mode:
6451                 for _decode_path, value, v_tail in spec.decode_evgen(
6452                         v,
6453                         sub_offset,
6454                         leavemm=True,
6455                         decode_path=sub_decode_path,
6456                         ctx=ctx,
6457                         _ctx_immutable=False,
6458                 ):
6459                     yield _decode_path, value, v_tail
6460             else:
6461                 _, value, v_tail = next(spec.decode_evgen(
6462                     v,
6463                     sub_offset,
6464                     leavemm=True,
6465                     decode_path=sub_decode_path,
6466                     ctx=ctx,
6467                     _ctx_immutable=False,
6468                     _evgen_mode=False,
6469                 ))
6470             value_tag_order = value.tag_order
6471             value_len = value.fulllen
6472             if tag_order_prev >= value_tag_order:
6473                 if ctx_bered or ctx_allow_unordered_set:
6474                     ber_encoded = True
6475                 else:
6476                     raise DecodeError(
6477                         "unordered " + self.asn1_type_name,
6478                         klass=self.__class__,
6479                         decode_path=sub_decode_path,
6480                         offset=sub_offset,
6481                     )
6482             if spec.default is None or value != spec.default:
6483                 pass
6484             elif ctx_bered or ctx_allow_default_values:
6485                 ber_encoded = True
6486             else:
6487                 raise DecodeError(
6488                     "DEFAULT value met",
6489                     klass=self.__class__,
6490                     decode_path=sub_decode_path,
6491                     offset=sub_offset,
6492                 )
6493             values[name] = value
6494             del _specs_items[name]
6495             tag_order_prev = value_tag_order
6496             sub_offset += value_len
6497             vlen += value_len
6498             v = v_tail
6499
6500         obj = self.__class__(
6501             schema=self.specs,
6502             impl=self.tag,
6503             expl=self._expl,
6504             default=self.default,
6505             optional=self.optional,
6506             _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6507         )
6508         if lenindef:
6509             if v[:EOC_LEN].tobytes() != EOC:
6510                 raise DecodeError(
6511                     "no EOC",
6512                     klass=self.__class__,
6513                     decode_path=decode_path,
6514                     offset=offset,
6515                 )
6516             tail = v[EOC_LEN:]
6517             obj.lenindef = True
6518         for name, spec in iteritems(self.specs):
6519             if name not in values and not spec.optional:
6520                 raise DecodeError(
6521                     "%s value is not ready" % name,
6522                     klass=self.__class__,
6523                     decode_path=decode_path,
6524                     offset=offset,
6525                 )
6526         if not evgen_mode:
6527             obj._value = values
6528         obj.ber_encoded = ber_encoded
6529         yield decode_path, obj, tail
6530
6531
6532 SequenceOfState = namedtuple(
6533     "SequenceOfState",
6534     BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6535     **NAMEDTUPLE_KWARGS
6536 )
6537
6538
6539 class SequenceOf(Obj):
6540     """``SEQUENCE OF`` sequence type
6541
6542     For that kind of type you must specify the object it will carry on
6543     (bounds are for example here, not required)::
6544
6545         class Ints(SequenceOf):
6546             schema = Integer()
6547             bounds = (0, 2)
6548
6549     >>> ints = Ints()
6550     >>> ints.append(Integer(123))
6551     >>> ints.append(Integer(234))
6552     >>> ints
6553     Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6554     >>> [int(i) for i in ints]
6555     [123, 234]
6556     >>> ints.append(Integer(345))
6557     Traceback (most recent call last):
6558     pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6559     >>> ints[1]
6560     INTEGER 234
6561     >>> ints[1] = Integer(345)
6562     >>> ints
6563     Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6564
6565     You can initialize sequence with preinitialized values:
6566
6567     >>> ints = Ints([Integer(123), Integer(234)])
6568
6569     Also you can use iterator as a value:
6570
6571     >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6572
6573     And it won't be iterated until encoding process. Pay attention that
6574     bounds and required schema checks are done only during the encoding
6575     process in that case! After encode was called, then value is zeroed
6576     back to empty list and you have to set it again. That mode is useful
6577     mainly with CER encoding mode, where all objects from the iterable
6578     will be streamed to the buffer, without copying all of them to
6579     memory first.
6580     """
6581     __slots__ = ("spec", "_bound_min", "_bound_max")
6582     tag_default = tag_encode(form=TagFormConstructed, num=16)
6583     asn1_type_name = "SEQUENCE OF"
6584
6585     def __init__(
6586             self,
6587             value=None,
6588             schema=None,
6589             bounds=None,
6590             impl=None,
6591             expl=None,
6592             default=None,
6593             optional=False,
6594             _decoded=(0, 0, 0),
6595     ):
6596         super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6597         if schema is None:
6598             schema = getattr(self, "schema", None)
6599         if schema is None:
6600             raise ValueError("schema must be specified")
6601         self.spec = schema
6602         self._bound_min, self._bound_max = getattr(
6603             self,
6604             "bounds",
6605             (0, float("+inf")),
6606         ) if bounds is None else bounds
6607         self._value = []
6608         if value is not None:
6609             self._value = self._value_sanitize(value)
6610         if default is not None:
6611             default_value = self._value_sanitize(default)
6612             default_obj = self.__class__(
6613                 schema=schema,
6614                 impl=self.tag,
6615                 expl=self._expl,
6616             )
6617             default_obj._value = default_value
6618             self.default = default_obj
6619             if value is None:
6620                 self._value = copy(default_obj._value)
6621
6622     def _value_sanitize(self, value):
6623         iterator = False
6624         if issubclass(value.__class__, SequenceOf):
6625             value = value._value
6626         elif hasattr(value, NEXT_ATTR_NAME):
6627             iterator = True
6628             value = value
6629         elif hasattr(value, "__iter__"):
6630             value = list(value)
6631         else:
6632             raise InvalidValueType((self.__class__, iter, "iterator"))
6633         if not iterator:
6634             if not self._bound_min <= len(value) <= self._bound_max:
6635                 raise BoundsError(self._bound_min, len(value), self._bound_max)
6636             class_expected = self.spec.__class__
6637             for v in value:
6638                 if not isinstance(v, class_expected):
6639                     raise InvalidValueType((class_expected,))
6640         return value
6641
6642     @property
6643     def ready(self):
6644         if hasattr(self._value, NEXT_ATTR_NAME):
6645             return True
6646         if self._bound_min > 0 and len(self._value) == 0:
6647             return False
6648         return all(v.ready for v in self._value)
6649
6650     @property
6651     def bered(self):
6652         if self.expl_lenindef or self.lenindef or self.ber_encoded:
6653             return True
6654         return any(v.bered for v in self._value)
6655
6656     def __getstate__(self):
6657         if hasattr(self._value, NEXT_ATTR_NAME):
6658             raise ValueError("can not pickle SequenceOf with iterator")
6659         return SequenceOfState(
6660             __version__,
6661             self.tag,
6662             self._tag_order,
6663             self._expl,
6664             self.default,
6665             self.optional,
6666             self.offset,
6667             self.llen,
6668             self.vlen,
6669             self.expl_lenindef,
6670             self.lenindef,
6671             self.ber_encoded,
6672             self.spec,
6673             [copy(v) for v in self._value],
6674             self._bound_min,
6675             self._bound_max,
6676         )
6677
6678     def __setstate__(self, state):
6679         super(SequenceOf, self).__setstate__(state)
6680         self.spec = state.spec
6681         self._value = state.value
6682         self._bound_min = state.bound_min
6683         self._bound_max = state.bound_max
6684
6685     def __eq__(self, their):
6686         if isinstance(their, self.__class__):
6687             return (
6688                 self.spec == their.spec and
6689                 self.tag == their.tag and
6690                 self._expl == their._expl and
6691                 self._value == their._value
6692             )
6693         if hasattr(their, "__iter__"):
6694             return self._value == list(their)
6695         return False
6696
6697     def __call__(
6698             self,
6699             value=None,
6700             bounds=None,
6701             impl=None,
6702             expl=None,
6703             default=None,
6704             optional=None,
6705     ):
6706         return self.__class__(
6707             value=value,
6708             schema=self.spec,
6709             bounds=(
6710                 (self._bound_min, self._bound_max)
6711                 if bounds is None else bounds
6712             ),
6713             impl=self.tag if impl is None else impl,
6714             expl=self._expl if expl is None else expl,
6715             default=self.default if default is None else default,
6716             optional=self.optional if optional is None else optional,
6717         )
6718
6719     def __contains__(self, key):
6720         return key in self._value
6721
6722     def append(self, value):
6723         if not isinstance(value, self.spec.__class__):
6724             raise InvalidValueType((self.spec.__class__,))
6725         if len(self._value) + 1 > self._bound_max:
6726             raise BoundsError(
6727                 self._bound_min,
6728                 len(self._value) + 1,
6729                 self._bound_max,
6730             )
6731         self._value.append(value)
6732
6733     def __iter__(self):
6734         return iter(self._value)
6735
6736     def __len__(self):
6737         return len(self._value)
6738
6739     def __setitem__(self, key, value):
6740         if not isinstance(value, self.spec.__class__):
6741             raise InvalidValueType((self.spec.__class__,))
6742         self._value[key] = self.spec(value=value)
6743
6744     def __getitem__(self, key):
6745         return self._value[key]
6746
6747     def _values_for_encoding(self):
6748         return iter(self._value)
6749
6750     def _encode(self):
6751         iterator = hasattr(self._value, NEXT_ATTR_NAME)
6752         if iterator:
6753             values = []
6754             values_append = values.append
6755             class_expected = self.spec.__class__
6756             values_for_encoding = self._values_for_encoding()
6757             self._value = []
6758             for v in values_for_encoding:
6759                 if not isinstance(v, class_expected):
6760                     raise InvalidValueType((class_expected,))
6761                 values_append(v.encode())
6762             if not self._bound_min <= len(values) <= self._bound_max:
6763                 raise BoundsError(self._bound_min, len(values), self._bound_max)
6764             value = b"".join(values)
6765         else:
6766             value = b"".join(v.encode() for v in self._values_for_encoding())
6767         return b"".join((self.tag, len_encode(len(value)), value))
6768
6769     def _encode_cer(self, writer):
6770         write_full(writer, self.tag + LENINDEF)
6771         iterator = hasattr(self._value, NEXT_ATTR_NAME)
6772         if iterator:
6773             class_expected = self.spec.__class__
6774             values_count = 0
6775             values_for_encoding = self._values_for_encoding()
6776             self._value = []
6777             for v in values_for_encoding:
6778                 if not isinstance(v, class_expected):
6779                     raise InvalidValueType((class_expected,))
6780                 v.encode_cer(writer)
6781                 values_count += 1
6782             if not self._bound_min <= values_count <= self._bound_max:
6783                 raise BoundsError(self._bound_min, values_count, self._bound_max)
6784         else:
6785             for v in self._values_for_encoding():
6786                 v.encode_cer(writer)
6787         write_full(writer, EOC)
6788
6789     def _decode(
6790             self,
6791             tlv,
6792             offset,
6793             decode_path,
6794             ctx,
6795             tag_only,
6796             evgen_mode,
6797             ordering_check=False,
6798     ):
6799         try:
6800             t, tlen, lv = tag_strip(tlv)
6801         except DecodeError as err:
6802             raise err.__class__(
6803                 msg=err.msg,
6804                 klass=self.__class__,
6805                 decode_path=decode_path,
6806                 offset=offset,
6807             )
6808         if t != self.tag:
6809             raise TagMismatch(
6810                 klass=self.__class__,
6811                 decode_path=decode_path,
6812                 offset=offset,
6813             )
6814         if tag_only:
6815             yield None
6816             return
6817         lenindef = False
6818         ctx_bered = ctx.get("bered", False)
6819         try:
6820             l, llen, v = len_decode(lv)
6821         except LenIndefForm as err:
6822             if not ctx_bered:
6823                 raise err.__class__(
6824                     msg=err.msg,
6825                     klass=self.__class__,
6826                     decode_path=decode_path,
6827                     offset=offset,
6828                 )
6829             l, llen, v = 0, 1, lv[1:]
6830             lenindef = True
6831         except DecodeError as err:
6832             raise err.__class__(
6833                 msg=err.msg,
6834                 klass=self.__class__,
6835                 decode_path=decode_path,
6836                 offset=offset,
6837             )
6838         if l > len(v):
6839             raise NotEnoughData(
6840                 "encoded length is longer than data",
6841                 klass=self.__class__,
6842                 decode_path=decode_path,
6843                 offset=offset,
6844             )
6845         if not lenindef:
6846             v, tail = v[:l], v[l:]
6847         vlen = 0
6848         sub_offset = offset + tlen + llen
6849         _value = []
6850         _value_count = 0
6851         ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6852         value_prev = memoryview(v[:0])
6853         ber_encoded = False
6854         spec = self.spec
6855         while len(v) > 0:
6856             if lenindef and v[:EOC_LEN].tobytes() == EOC:
6857                 break
6858             sub_decode_path = decode_path + (str(_value_count),)
6859             if evgen_mode:
6860                 for _decode_path, value, v_tail in spec.decode_evgen(
6861                         v,
6862                         sub_offset,
6863                         leavemm=True,
6864                         decode_path=sub_decode_path,
6865                         ctx=ctx,
6866                         _ctx_immutable=False,
6867                 ):
6868                     yield _decode_path, value, v_tail
6869             else:
6870                 _, value, v_tail = next(spec.decode_evgen(
6871                     v,
6872                     sub_offset,
6873                     leavemm=True,
6874                     decode_path=sub_decode_path,
6875                     ctx=ctx,
6876                     _ctx_immutable=False,
6877                     _evgen_mode=False,
6878                 ))
6879             value_len = value.fulllen
6880             if ordering_check:
6881                 if value_prev.tobytes() > v[:value_len].tobytes():
6882                     if ctx_bered or ctx_allow_unordered_set:
6883                         ber_encoded = True
6884                     else:
6885                         raise DecodeError(
6886                             "unordered " + self.asn1_type_name,
6887                             klass=self.__class__,
6888                             decode_path=sub_decode_path,
6889                             offset=sub_offset,
6890                         )
6891                 value_prev = v[:value_len]
6892             _value_count += 1
6893             if not evgen_mode:
6894                 _value.append(value)
6895             sub_offset += value_len
6896             vlen += value_len
6897             v = v_tail
6898         if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
6899             raise DecodeError(
6900                 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
6901                 klass=self.__class__,
6902                 decode_path=decode_path,
6903                 offset=offset,
6904             )
6905         try:
6906             obj = self.__class__(
6907                 value=None if evgen_mode else _value,
6908                 schema=spec,
6909                 bounds=(self._bound_min, self._bound_max),
6910                 impl=self.tag,
6911                 expl=self._expl,
6912                 default=self.default,
6913                 optional=self.optional,
6914                 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6915             )
6916         except BoundsError as err:
6917             raise DecodeError(
6918                 msg=str(err),
6919                 klass=self.__class__,
6920                 decode_path=decode_path,
6921                 offset=offset,
6922             )
6923         if lenindef:
6924             if v[:EOC_LEN].tobytes() != EOC:
6925                 raise DecodeError(
6926                     "no EOC",
6927                     klass=self.__class__,
6928                     decode_path=decode_path,
6929                     offset=offset,
6930                 )
6931             obj.lenindef = True
6932             tail = v[EOC_LEN:]
6933         obj.ber_encoded = ber_encoded
6934         yield decode_path, obj, tail
6935
6936     def __repr__(self):
6937         return "%s[%s]" % (
6938             pp_console_row(next(self.pps())),
6939             ", ".join(repr(v) for v in self._value),
6940         )
6941
6942     def pps(self, decode_path=()):
6943         yield _pp(
6944             obj=self,
6945             asn1_type_name=self.asn1_type_name,
6946             obj_name=self.__class__.__name__,
6947             decode_path=decode_path,
6948             optional=self.optional,
6949             default=self == self.default,
6950             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6951             expl=None if self._expl is None else tag_decode(self._expl),
6952             offset=self.offset,
6953             tlen=self.tlen,
6954             llen=self.llen,
6955             vlen=self.vlen,
6956             expl_offset=self.expl_offset if self.expled else None,
6957             expl_tlen=self.expl_tlen if self.expled else None,
6958             expl_llen=self.expl_llen if self.expled else None,
6959             expl_vlen=self.expl_vlen if self.expled else None,
6960             expl_lenindef=self.expl_lenindef,
6961             lenindef=self.lenindef,
6962             ber_encoded=self.ber_encoded,
6963             bered=self.bered,
6964         )
6965         for i, value in enumerate(self._value):
6966             yield value.pps(decode_path=decode_path + (str(i),))
6967         for pp in self.pps_lenindef(decode_path):
6968             yield pp
6969
6970
6971 class SetOf(SequenceOf):
6972     """``SET OF`` sequence type
6973
6974     Its usage is identical to :py:class:`pyderasn.SequenceOf`.
6975     """
6976     __slots__ = ()
6977     tag_default = tag_encode(form=TagFormConstructed, num=17)
6978     asn1_type_name = "SET OF"
6979
6980     def _value_sanitize(self, value):
6981         value = super(SetOf, self)._value_sanitize(value)
6982         if hasattr(value, NEXT_ATTR_NAME):
6983             raise ValueError(
6984                 "SetOf does not support iterator values, as no sense in them"
6985             )
6986         return value
6987
6988     def _encode(self):
6989         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
6990         return b"".join((self.tag, len_encode(len(v)), v))
6991
6992     def _encode_cer(self, writer):
6993         write_full(writer, self.tag + LENINDEF)
6994         for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
6995             write_full(writer, v)
6996         write_full(writer, EOC)
6997
6998     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6999         return super(SetOf, self)._decode(
7000             tlv,
7001             offset,
7002             decode_path,
7003             ctx,
7004             tag_only,
7005             evgen_mode,
7006             ordering_check=True,
7007         )
7008
7009
7010 def obj_by_path(pypath):  # pragma: no cover
7011     """Import object specified as string Python path
7012
7013     Modules must be separated from classes/functions with ``:``.
7014
7015     >>> obj_by_path("foo.bar:Baz")
7016     <class 'foo.bar.Baz'>
7017     >>> obj_by_path("foo.bar:Baz.boo")
7018     <classmethod 'foo.bar.Baz.boo'>
7019     """
7020     mod, objs = pypath.rsplit(":", 1)
7021     from importlib import import_module
7022     obj = import_module(mod)
7023     for obj_name in objs.split("."):
7024         obj = getattr(obj, obj_name)
7025     return obj
7026
7027
7028 def generic_decoder():  # pragma: no cover
7029     # All of this below is a big hack with self references
7030     choice = PrimitiveTypes()
7031     choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7032     choice.specs["SetOf"] = SetOf(schema=choice)
7033     for i in six_xrange(31):
7034         choice.specs["SequenceOf%d" % i] = SequenceOf(
7035             schema=choice,
7036             expl=tag_ctxc(i),
7037         )
7038     choice.specs["Any"] = Any()
7039
7040     # Class name equals to type name, to omit it from output
7041     class SEQUENCEOF(SequenceOf):
7042         __slots__ = ()
7043         schema = choice
7044
7045     def pprint_any(
7046             obj,
7047             oid_maps=(),
7048             with_colours=False,
7049             with_decode_path=False,
7050             decode_path_only=(),
7051     ):
7052         def _pprint_pps(pps):
7053             for pp in pps:
7054                 if hasattr(pp, "_fields"):
7055                     if (
7056                             decode_path_only != () and
7057                             pp.decode_path[:len(decode_path_only)] != decode_path_only
7058                     ):
7059                         continue
7060                     if pp.asn1_type_name == Choice.asn1_type_name:
7061                         continue
7062                     pp_kwargs = pp._asdict()
7063                     pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7064                     pp = _pp(**pp_kwargs)
7065                     yield pp_console_row(
7066                         pp,
7067                         oid_maps=oid_maps,
7068                         with_offsets=True,
7069                         with_blob=False,
7070                         with_colours=with_colours,
7071                         with_decode_path=with_decode_path,
7072                         decode_path_len_decrease=len(decode_path_only),
7073                     )
7074                     for row in pp_console_blob(
7075                             pp,
7076                             decode_path_len_decrease=len(decode_path_only),
7077                     ):
7078                         yield row
7079                 else:
7080                     for row in _pprint_pps(pp):
7081                         yield row
7082         return "\n".join(_pprint_pps(obj.pps()))
7083     return SEQUENCEOF(), pprint_any
7084
7085
7086 def main():  # pragma: no cover
7087     import argparse
7088     parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/DER decoder")
7089     parser.add_argument(
7090         "--skip",
7091         type=int,
7092         default=0,
7093         help="Skip that number of bytes from the beginning",
7094     )
7095     parser.add_argument(
7096         "--oids",
7097         help="Python paths to dictionary with OIDs, comma separated",
7098     )
7099     parser.add_argument(
7100         "--schema",
7101         help="Python path to schema definition to use",
7102     )
7103     parser.add_argument(
7104         "--defines-by-path",
7105         help="Python path to decoder's defines_by_path",
7106     )
7107     parser.add_argument(
7108         "--nobered",
7109         action="store_true",
7110         help="Disallow BER encoding",
7111     )
7112     parser.add_argument(
7113         "--print-decode-path",
7114         action="store_true",
7115         help="Print decode paths",
7116     )
7117     parser.add_argument(
7118         "--decode-path-only",
7119         help="Print only specified decode path",
7120     )
7121     parser.add_argument(
7122         "--allow-expl-oob",
7123         action="store_true",
7124         help="Allow explicit tag out-of-bound",
7125     )
7126     parser.add_argument(
7127         "--evgen",
7128         action="store_true",
7129         help="Turn on event generation mode",
7130     )
7131     parser.add_argument(
7132         "RAWFile",
7133         type=argparse.FileType("rb"),
7134         help="Path to BER/CER/DER file you want to decode",
7135     )
7136     args = parser.parse_args()
7137     if PY2:
7138         args.RAWFile.seek(args.skip)
7139         raw = memoryview(args.RAWFile.read())
7140         args.RAWFile.close()
7141     else:
7142         raw = file_mmaped(args.RAWFile)[args.skip:]
7143     oid_maps = (
7144         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7145         if args.oids else ()
7146     )
7147     if args.schema:
7148         schema = obj_by_path(args.schema)
7149         from functools import partial
7150         pprinter = partial(pprint, big_blobs=True)
7151     else:
7152         schema, pprinter = generic_decoder()
7153     ctx = {
7154         "bered": not args.nobered,
7155         "allow_expl_oob": args.allow_expl_oob,
7156     }
7157     if args.defines_by_path is not None:
7158         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7159     from os import environ
7160     pprinter = partial(
7161         pprinter,
7162         oid_maps=oid_maps,
7163         with_colours=environ.get("NO_COLOR") is None,
7164         with_decode_path=args.print_decode_path,
7165         decode_path_only=(
7166             () if args.decode_path_only is None else
7167             tuple(args.decode_path_only.split(":"))
7168         ),
7169     )
7170     if args.evgen:
7171         for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7172             print(pprinter(obj, decode_path=decode_path))
7173     else:
7174         obj, tail = schema().decode(raw, ctx=ctx)
7175         print(pprinter(obj))
7176     if tail != b"":
7177         print("\nTrailing data: %s" % hexenc(tail))
7178
7179
7180 if __name__ == "__main__":
7181     main()