this repo has no description
1#!/usr/bin/env python3
2# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com)
3
4from builtins import _index, _str_array
5
6from _builtins import (
7 _builtin,
8 _bytes_check,
9 _bytes_decode,
10 _bytes_decode_ascii,
11 _bytes_decode_utf_8,
12 _bytes_len,
13 _byteslike_guard,
14 _int_check,
15 _object_type_hasattr,
16 _str_array_iadd,
17 _str_check,
18 _str_encode,
19 _str_encode_ascii,
20 _str_guard,
21 _str_len,
22 _tuple_check,
23 _tuple_len,
24 _type,
25 _Unbound,
26 _unimplemented,
27 maxunicode as _maxunicode,
28)
29
30
31codec_search_path = []
32
33
34codec_search_cache = {}
35
36
37def register(search_func):
38 if not callable(search_func):
39 raise TypeError("argument must be callable")
40 codec_search_path.append(search_func)
41
42
43def lookup(encoding):
44 cached = codec_search_cache.get(encoding)
45 if cached is not None:
46 return cached
47 # Make sure that we loaded the standard codecs.
48 if not codec_search_path:
49 import encodings # noqa: F401
50
51 normalized_encoding = encoding.lower().replace(" ", "-")
52 result = None
53 for search_func in codec_search_path:
54 result = search_func(normalized_encoding)
55 if result is None:
56 continue
57 if not _tuple_check(result) or _tuple_len(result) != 4:
58 raise TypeError("codec search functions must return 4-tuples")
59 break
60 if result is None:
61 raise LookupError(f"unknown encoding: {encoding}")
62
63 codec_search_cache[encoding] = result
64 return result
65
66
67def _lookup_text(encoding, alternate_command):
68 codec = lookup(encoding)
69 if _type(codec) != tuple:
70 try:
71 if not codec._is_text_encoding:
72 raise LookupError(
73 f"{encoding} is not a text encoding; "
74 f"use {alternate_command} to handle arbitrary codecs"
75 )
76 except AttributeError:
77 pass
78 return codec
79
80
81def decode(data, encoding: str = "utf-8", errors: str = _Unbound) -> str:
82 result = _bytes_decode(data, encoding)
83 if result is not _Unbound:
84 return result
85 try:
86 return _codec_decode_table[encoding.lower()](
87 data, "strict" if errors is _Unbound else errors
88 )[0]
89 except KeyError:
90 try:
91 decoder = lookup(encoding)[1]
92 except LookupError:
93 raise LookupError(f"unknown encoding: {encoding}")
94 if errors is _Unbound:
95 result = decoder(data)
96 else:
97 result = decoder(data, errors)
98 if _tuple_check(result) and _tuple_len(result) == 2:
99 return result[0]
100 # CPython does not check to make sure that the second element is an int
101 raise TypeError("decoder must return a tuple (object,integer)")
102
103
104def encode(data, encoding: str = "utf-8", errors: str = _Unbound) -> bytes:
105 result = _str_encode(data, encoding)
106 if result is not _Unbound:
107 return result
108 try:
109 return _codec_encode_table[encoding.lower()](
110 data, "strict" if errors is _Unbound else errors
111 )[0]
112 except KeyError:
113 try:
114 encoder = lookup(encoding)[0]
115 except LookupError:
116 raise LookupError(f"unknown encoding: {encoding}")
117 if errors is _Unbound:
118 result = encoder(data)
119 else:
120 result = encoder(data, errors)
121 if _tuple_check(result) and _tuple_len(result) == 2:
122 return result[0]
123 # CPython does not check to make sure that the second element is an int
124 raise TypeError("encoder must return a tuple (object, integer)")
125
126
127def _ascii_decode(data: str, errors: str, index: int, out: _str_array):
128 _builtin()
129
130
131def ascii_decode(data: bytes, errors: str = "strict"):
132 _byteslike_guard(data)
133 if not _str_check(errors):
134 raise TypeError(
135 "ascii_decode() argument 2 must be str or None, not "
136 f"'{_type(errors).__name__}'"
137 )
138 result = _bytes_decode_ascii(data)
139 if result is not _Unbound:
140 return result, _bytes_len(data)
141 result = _str_array()
142 i = 0
143 encoded = ""
144 length = len(data)
145 while i < length:
146 encoded, i = _ascii_decode(data, errors, i, result)
147 if _int_check(encoded):
148 data, i = _call_decode_errorhandler(
149 errors, data, result, "ordinal not in range(128)", "ascii", encoded, i
150 )
151 if _str_check(encoded):
152 return encoded, i
153 # The error handler was the last to write to the result
154 return str(result), i
155
156
157def _ascii_encode(data: str, errors: str, index: int, out: bytearray):
158 """Tries to encode `data`, starting from `index`, into the `out` bytearray.
159 If it encounters any codepoints above 127, it tries using the `errors`
160 error handler to fix it internally, but returns the a tuple of the first
161 and last index of the error on failure.
162 If it finishes encoding, it returns a tuple of the final bytes and length.
163 """
164 _builtin()
165
166
167def ascii_encode(data: str, errors: str = "strict"):
168 if not _str_check(data):
169 raise TypeError(
170 f"ascii_encode() argument 1 must be str, not {_type(data).__name__}"
171 )
172 if not _str_check(errors):
173 raise TypeError(
174 "ascii_encode() argument 2 must be str or None, not "
175 f"{_type(errors).__name__}"
176 )
177 result = _str_encode_ascii(data)
178 if result is not _Unbound:
179 return result, _str_len(data)
180 result = bytearray()
181 i = 0
182 encoded = b""
183 length = _str_len(data)
184 while i < length:
185 encoded, i = _ascii_encode(data, errors, i, result)
186 if _int_check(encoded):
187 unicode, pos = _call_encode_errorhandler(
188 errors, data, "ordinal not in range(128)", "ascii", encoded, i
189 )
190 if _bytes_check(unicode):
191 result += unicode
192 i = pos
193 continue
194 for char in unicode:
195 if char > "\x7F":
196 raise UnicodeEncodeError(
197 "ascii", data, encoded, i, "ordinal not in range(128)"
198 )
199 _bytearray_string_append(result, unicode)
200 i = pos
201 if _bytes_check(encoded):
202 return encoded, i
203 # _ascii_encode encountered an error and _call_encode_errorhandler was the
204 # last function to write to `result`.
205 return bytes(result), i
206
207
208def charmap_decode(data, errors="strict", mapping=None):
209 _byteslike_guard(data)
210 _str_guard(errors)
211 if errors != "strict":
212 _unimplemented()
213
214 result = _str_array()
215 data_len = _bytes_len(data)
216 i = 0
217 while i < data_len:
218 try:
219 mapped = mapping[data[i]]
220 if mapped is None or mapped == "\ufffe":
221 raise UnicodeDecodeError(
222 "charmap", data, data[i], i, "character maps to <undefined>"
223 )
224 if _int_check(mapped):
225 if mapped < 0 or mapped > _maxunicode:
226 raise TypeError(
227 f"character mapping must be in range ({_maxunicode + 1:#x})"
228 )
229 mapped = chr(mapped)
230 elif not _str_check(mapped):
231 raise TypeError("character mapping must return integer, None or str")
232 _str_array_iadd(result, mapped)
233 except (IndexError, KeyError):
234 raise UnicodeDecodeError(
235 "charmap", data, data[i], i, "character maps to <undefined>"
236 )
237 i += 1
238
239 return str(result), data_len
240
241
242def _escape_decode(data: bytes, errors: str, recode_encoding: str):
243 """Tries to decode `data`.
244 If it runs into any errors, it raises and returns the message to throw.
245 If it finishes encoding, it returns a tuple of
246 (decoded, length, first_invalid_escape)
247 where the first_invalid_escape is either the index into the data of the first
248 invalid escape sequence, or -1 if none occur.
249 Will eventually have to handle the recode_encoding argument.
250 """
251 _builtin()
252
253
254def _escape_decode_stateful(
255 data: bytes, errors: str = "strict", recode_encoding: str = ""
256):
257 if not _str_check(data):
258 _byteslike_guard(data)
259 if not _str_check(errors):
260 raise TypeError(
261 "escape_decode() argument 2 must be str or None, not "
262 f"{type(errors).__name__}"
263 )
264 decoded = _escape_decode(data, errors, recode_encoding)
265 if _str_check(decoded):
266 raise ValueError(decoded)
267 return decoded
268
269
270def escape_decode(data, errors: str = "strict"):
271 escaped, length, _ = _escape_decode_stateful(data, errors)
272 return escaped, length
273
274
275def _latin_1_decode(data: bytes):
276 _builtin()
277
278
279def latin_1_decode(data: bytes, errors: str = "strict"):
280 _byteslike_guard(data)
281 if not _str_check(errors):
282 raise TypeError(
283 "latin_1_decode() argument 2 must be str or None, not "
284 f"'{_type(errors).__name__}'"
285 )
286 return _latin_1_decode(data)
287
288
289def _latin_1_encode(data: str, errors: str, index: int, out: bytearray):
290 """Tries to encode `data`, starting from `index`, into the `out` bytearray.
291 If it encounters any codepoints above 255, it tries using the `errors`
292 error handler to fix it internally, but returns the a tuple of the first
293 and last index of the error on failure.
294 If it finishes encoding, it returns a tuple of the final bytes and length.
295 """
296 _builtin()
297
298
299def latin_1_encode(data: str, errors: str = "strict"):
300 if not _str_check(data):
301 raise TypeError(
302 f"latin_1_encode() argument 1 must be str, not {_type(data).__name__}"
303 )
304 if not _str_check(errors):
305 raise TypeError(
306 "latin_1_encode() argument 2 must be str or None, not "
307 f"{_type(errors).__name__}"
308 )
309 result = bytearray()
310 i = 0
311 encoded = b""
312 length = _str_len(data)
313 while i < length:
314 encoded, i = _latin_1_encode(data, errors, i, result)
315 if _int_check(encoded):
316 unicode, pos = _call_encode_errorhandler(
317 errors, data, "ordinal not in range(256)", "latin-1", encoded, i
318 )
319 if _bytes_check(unicode):
320 result += unicode
321 i = pos
322 continue
323 for char in unicode:
324 if char > "\xFF":
325 raise UnicodeEncodeError(
326 "latin-1", data, encoded, i, "ordinal not in range(256)"
327 )
328 result += latin_1_encode(unicode, errors)[0]
329 i = pos
330 if _bytes_check(encoded):
331 return encoded, i
332 # _latin_1_encode encountered an error and _call_encode_errorhandler was the
333 # last function to write to `result`.
334 return bytes(result), i
335
336
337def _raw_unicode_escape_decode(data: bytes, errors: str, index: int, out: _str_array):
338 """Tries to decode `data`, starting from `index`, into the `out` _str_array.
339 Only decodes raw unicode uXXXX or UXXXXXXXX.
340 If it runs into any errors, it returns a tuple of
341 (error_start, error_end, error_message),
342 If it finishes decoding, it returns a tuple of
343 (decoded, length)
344 """
345 _builtin()
346
347
348def raw_unicode_escape_decode(data, errors: str = "strict"):
349 if not _str_check(data):
350 _byteslike_guard(data)
351 if not _str_check(errors):
352 raise TypeError(
353 "raw_unicode_escape_decode() argument 2 must be str, not "
354 f"{type(errors).__name__}"
355 )
356 result = _str_array()
357 i = 0
358 decoded = ""
359 length = len(data)
360 while i < length:
361 decoded, i, error_msg = _raw_unicode_escape_decode(data, errors, i, result)
362 if error_msg:
363 data, i = _call_decode_errorhandler(
364 errors, data, result, error_msg, "rawunicodeescape", decoded, i
365 )
366 if _str_check(decoded):
367 return decoded, i
368 # The error handler was the last to write to the result
369 return str(result), i
370
371
372def _raw_unicode_escape_encode(data):
373 _builtin()
374
375
376def raw_unicode_escape_encode(data, errors: str = "strict"):
377 if not _str_check(data):
378 raise TypeError(
379 f"raw_unicode_escape_encode() argument 1 must be str, not {_type(data).__name__}"
380 )
381 if not _str_check(errors):
382 raise TypeError(
383 "raw_unicode_escape_encode() argument 2 must be str, not "
384 f"{type(errors).__name__}"
385 )
386 return _raw_unicode_escape_encode(data)
387
388
389def _unicode_escape_decode(data: bytes, errors: str, index: int, out: _str_array):
390 """Tries to decode `data`, starting from `index`, into the `out` _str_array.
391 If it runs into any errors, it returns a tuple of
392 (error_start, error_end, error_message, first_invalid_escape),
393 where the first_invalid_escape is either the index into the data of the first
394 invalid escape sequence, or -1 if none occur.
395 If it finishes encoding, it returns a tuple of
396 (decoded, length, "", first_invalid_escape)
397 """
398 _builtin()
399
400
401def _unicode_escape_decode_stateful(data: bytes, errors: str = "strict"):
402 if not _str_check(data):
403 _byteslike_guard(data)
404 if not _str_check(errors):
405 raise TypeError(
406 "unicode_escape_decode() argument 2 must be str or None, not "
407 f"{type(errors).__name__}"
408 )
409 result = _str_array()
410 i = 0
411 decoded = ""
412 length = len(data)
413 first_invalid = -1
414 while i < length:
415 decoded, i, error_msg, first_invalid = _unicode_escape_decode(
416 data, errors, i, result
417 )
418 if error_msg:
419 data, i = _call_decode_errorhandler(
420 errors, data, result, error_msg, "unicodeescape", decoded, i
421 )
422 if _str_check(decoded):
423 return decoded, i, first_invalid
424 # The error handler was the last to write to the result
425 return str(result), i, first_invalid
426
427
428def unicode_escape_decode(data, errors: str = "strict"):
429 escaped, length, _ = _unicode_escape_decode_stateful(data, errors)
430 return escaped, length
431
432
433def unicode_escape_encode(data, errors: str = "strict"):
434 _unimplemented()
435
436
437def _utf_8_decode(
438 data: bytes, errors: str, index: int, out: _str_array, is_final: bool
439):
440 """Tries to decode `data`, starting from `index`, into the `out` _str_array.
441 If it runs into any errors, it returns a tuple of
442 (error_start, error_end, error_message),
443 If it finishes encoding, it returns a tuple of
444 (decoded, length, "")
445 """
446 _builtin()
447
448
449def utf_8_decode(data: bytes, errors: str = "strict", is_final: bool = False):
450 _byteslike_guard(data)
451 if not _str_check(errors) and not None:
452 raise TypeError(
453 "utf_8_decode() argument 2 must be str or None, not "
454 f"'{_type(errors).__name__}'"
455 )
456 result = _bytes_decode_utf_8(data)
457 if result is not _Unbound:
458 return result, _bytes_len(data)
459 result = _str_array()
460 i = 0
461 encoded = ""
462 length = len(data)
463 while i < length:
464 encoded, i, errmsg = _utf_8_decode(data, errors, i, result, is_final)
465 if _int_check(encoded):
466 data, i = _call_decode_errorhandler(
467 errors, data, result, errmsg, "utf-8", encoded, i
468 )
469 continue
470 # If encoded isn't an int, utf_8_decode returned because it ran into
471 # an error it could potentially recover from and is_final is true.
472 # We should stop decoding in this case.
473 break
474 if _str_check(encoded):
475 return encoded, i
476 # The error handler was the last to write to the result
477 return str(result), i
478
479
480def _utf_8_encode(data: str, errors: str, index: int, out: bytearray):
481 """Tries to encode `data`, starting from `index`, into the `out` bytearray.
482 If it encounters an error, it tries using the `errors` error handler to
483 fix it internally, but returns the a tuple of the first and last index of
484 the error.
485 If it finishes encoding, it returns a tuple of the final bytes and length.
486 """
487 _builtin()
488
489
490def utf_8_encode(data: str, errors: str = "strict"):
491 if not _str_check(data):
492 raise TypeError(
493 f"utf_8_encode() argument 1 must be str, not {_type(data).__name__}"
494 )
495 if not _str_check(errors):
496 raise TypeError(
497 "utf_8_encode() argument 2 must be str or None, not "
498 f"{_type(errors).__name__}"
499 )
500 result = bytearray()
501 i = 0
502 encoded = bytes()
503 length = _str_len(data)
504 while i < length:
505 encoded, i = _utf_8_encode(data, errors, i, result)
506 if _int_check(encoded):
507 unicode, pos = _call_encode_errorhandler(
508 errors, data, "surrogates not allowed", "utf-8", encoded, i
509 )
510 if _bytes_check(unicode):
511 result += unicode
512 i = pos
513 continue
514 for char in unicode:
515 if char > "\x7F":
516 raise UnicodeEncodeError(
517 "utf-8", data, encoded, i, "surrogates not allowed"
518 )
519 _bytearray_string_append(result, unicode)
520 i = pos
521 if _bytes_check(encoded):
522 return encoded, i
523 # _utf_8_encode encountered an error and _call_encode_errorhandler was the
524 # last function to write to `result`.
525 return bytes(result), i
526
527
528def _utf_16_encode(data: str, errors: str, index: int, out: bytearray, byteorder: int):
529 _builtin()
530
531
532def utf_16_encode(data: str, errors: str = "strict", byteorder: int = 0): # noqa: C901
533 if byteorder < 0:
534 h_encoding = "utf-16-le"
535 u_encoding = "utf_16_le"
536 elif byteorder < 0:
537 h_encoding = "utf-16-be"
538 u_encoding = "utf_16_be"
539 else:
540 h_encoding = "utf-16"
541 u_encoding = "utf_16"
542 if not _str_check(data):
543 raise TypeError(
544 f"{u_encoding}_encode() argument 1 must be str, not {_type(data).__name__}"
545 )
546 if not _str_check(errors):
547 raise TypeError(
548 f"{u_encoding}_encode() argument 2 must be str or None, not "
549 f"{_type(errors).__name__}"
550 )
551 result = bytearray()
552 if byteorder == 0:
553 result += b"\xFF"
554 result += b"\xFE"
555 i = 0
556 length = _str_len(data)
557 encoded = bytes(result)
558 while i < length:
559 encoded, i = _utf_16_encode(data, errors, i, result, byteorder)
560 if _int_check(encoded):
561 unicode, pos = _call_encode_errorhandler(
562 errors, data, "surrogates not allowed", h_encoding, encoded, i
563 )
564 if _bytes_check(unicode):
565 if _bytes_len(unicode) & 1:
566 raise UnicodeEncodeError(
567 h_encoding, data, encoded, i, "surrogates not allowed"
568 )
569 result += unicode
570 i = pos
571 continue
572 for char in unicode:
573 if char > "\x7F":
574 raise UnicodeEncodeError(
575 h_encoding, data, encoded, i, "surrogates not allowed"
576 )
577 result += utf_16_encode(
578 unicode, errors, -1 if byteorder == 0 else byteorder
579 )[0]
580 i = pos
581 if _bytes_check(encoded):
582 return encoded, i
583 # _utf_16_encode encountered an error and _call_encode_errorhandler was the
584 # last function to write to `result`.
585 return bytes(result), i
586
587
588def utf_16_le_decode(data: str, errors: str = "strict"):
589 _unimplemented()
590
591
592def utf_16_le_encode(data: str, errors: str = "strict"):
593 return utf_16_encode(data, errors, -1)
594
595
596def utf_16_be_decode(data: str, errors: str = "strict"):
597 _unimplemented()
598
599
600def utf_16_be_encode(data: str, errors: str = "strict"):
601 return utf_16_encode(data, errors, 1)
602
603
604def _utf_32_encode(data: str, errors: str, index: int, out: bytearray, byteorder: int):
605 _builtin()
606
607
608def utf_32_encode(data: str, errors: str = "strict", byteorder: int = 0): # noqa: C901
609 if byteorder < 0:
610 hEncoding = "utf-32-le"
611 uEncoding = "utf_32_le"
612 elif byteorder < 0:
613 hEncoding = "utf-32-be"
614 uEncoding = "utf_32_be"
615 else:
616 hEncoding = "utf-32"
617 uEncoding = "utf_32"
618 if not _str_check(data):
619 raise TypeError(
620 f"{uEncoding}_encode() argument 1 must be str, not {_type(data).__name__}"
621 )
622 if not _str_check(errors):
623 raise TypeError(
624 f"{uEncoding}_encode() argument 2 must be str or None, not "
625 f"{_type(errors).__name__}"
626 )
627 result = bytearray()
628 if byteorder == 0:
629 result += b"\xFF\xFE\x00\x00"
630 i = 0
631 length = _str_len(data)
632 encoded = bytes(result)
633 while i < length:
634 encoded, i = _utf_32_encode(data, errors, i, result, byteorder)
635 if _int_check(encoded):
636 unicode, pos = _call_encode_errorhandler(
637 errors, data, "surrogates not allowed", hEncoding, encoded, i
638 )
639 if _bytes_check(unicode):
640 if _bytes_len(unicode) & 3:
641 raise UnicodeEncodeError(
642 hEncoding, data, encoded, i, "surrogates not allowed"
643 )
644 result += unicode
645 i = pos
646 continue
647 for char in unicode:
648 if char > "\x7f":
649 raise UnicodeEncodeError(
650 hEncoding, data, encoded, i, "surrogates not allowed"
651 )
652 result += utf_32_encode(
653 unicode, errors, -1 if byteorder == 0 else byteorder
654 )[0]
655 i = pos
656 if _bytes_check(encoded):
657 return encoded, i
658 # _utf_32_encode encountered an error and _call_encode_errorhandler was the
659 # last function to write to `result`.
660 return bytes(result), i
661
662
663def utf_32_le_encode(data: str, errors: str = "strict"):
664 return utf_32_encode(data, errors, -1)
665
666
667def utf_32_be_encode(data: str, errors: str = "strict"):
668 return utf_32_encode(data, errors, 1)
669
670
671_codec_decode_table = {
672 "ascii": ascii_decode,
673 "us_ascii": ascii_decode,
674 "latin1": latin_1_decode,
675 "latin 1": latin_1_decode,
676 "latin-1": latin_1_decode,
677 "latin_1": latin_1_decode,
678 "utf_8": utf_8_decode,
679 "utf-8": utf_8_decode,
680 "utf8": utf_8_decode,
681}
682
683_codec_encode_table = {
684 "ascii": ascii_encode,
685 "us_ascii": ascii_encode,
686 "latin_1": latin_1_encode,
687 "latin-1": latin_1_encode,
688 "iso-8859-1": latin_1_encode,
689 "iso_8859_1": latin_1_encode,
690 "utf_8": utf_8_encode,
691 "utf-8": utf_8_encode,
692 "utf8": utf_8_encode,
693 "utf_16": utf_16_encode,
694 "utf-16": utf_16_encode,
695 "utf16": utf_16_encode,
696 "utf_16_le": utf_16_le_encode,
697 "utf-16-le": utf_16_le_encode,
698 "utf_16_be": utf_16_be_encode,
699 "utf-16-be": utf_16_be_encode,
700 "utf_32": utf_32_encode,
701 "utf-32": utf_32_encode,
702 "utf32": utf_32_encode,
703 "utf_32_le": utf_32_le_encode,
704 "utf-32-le": utf_32_le_encode,
705 "utf_32_be": utf_32_be_encode,
706 "utf-32-be": utf_32_be_encode,
707}
708
709
710def backslashreplace_errors(error):
711 _builtin()
712
713
714def strict_errors(error):
715 if not isinstance(error, Exception):
716 raise TypeError("codec must pass exception instance")
717 raise error
718
719
720def ignore_errors(error):
721 if not isinstance(error, UnicodeError):
722 raise TypeError(
723 f"don't know how to handle {_type(error).__name__} in error callback"
724 )
725 return ("", error.end)
726
727
728def lookup_error(error: str):
729 if not _str_check(error):
730 raise TypeError(
731 f"lookup_error() argument must be str, not {_type(error).__name__}"
732 )
733 try:
734 return _codec_error_registry[error]
735 except KeyError:
736 raise LookupError(f"unknown error handler name '{error}'")
737
738
739def register_error(name: str, error_func):
740 if not _str_check(name):
741 raise TypeError(
742 f"register_error() argument 1 must be str, not {_type(name).__name__}"
743 )
744 if not callable(error_func):
745 raise TypeError("handler must be callable")
746 _codec_error_registry[name] = error_func
747
748
749def _call_decode_errorhandler(
750 errors: str,
751 input: bytes,
752 output: _str_array,
753 reason: str,
754 encoding: str,
755 start: int,
756 end: int,
757):
758 """
759 Generic decoding errorhandling function
760 Creates a UnicodeDecodeError, looks up an error handler, and calls the
761 error handler with the UnicodeDecodeError.
762 Makes sure the error handler returns a (str, int) tuple and returns it and
763 writes the str to the output _str_array passed in.
764 Since the error handler can change the object that's being decoded by
765 replacing the object of the UnicodeDecodeError, this function returns the
766 Error's object field, along with the integer returned from the function
767 call that's been normalized to fit within the length of the object.
768
769 errors: The name of the error handling function to call
770 input: The input to be decoded
771 output: The string builder that the error handling result should be appended to
772 reason: The reason the errorhandler was called
773 encoding: The encoding being used
774 start: The index of the first non-erroneus byte
775 end: The index of the first non-erroneous byte
776 """
777 exception = UnicodeDecodeError(encoding, input, start, end, reason)
778 result = lookup_error(errors)(exception)
779 if not _tuple_check(result) or _tuple_len(result) != 2:
780 raise TypeError("decoding error handler must return (str, int) tuple")
781
782 replacement, pos = result
783 if not _str_check(replacement) or not _object_type_hasattr(pos, "__index__"):
784 raise TypeError("decoding error handler must return (str, int) tuple")
785
786 pos = _index(pos)
787 input = exception.object
788 if not _bytes_check(input):
789 raise TypeError("exception attribute object must be bytes")
790 if pos < 0:
791 pos += _bytes_len(input)
792 if not 0 <= pos <= _bytes_len(input):
793 raise IndexError(f"position {pos} from error handler out of bounds")
794 _str_array_iadd(output, replacement)
795
796 return (input, pos)
797
798
799def _call_encode_errorhandler(
800 errors: str, input: str, reason: str, encoding: str, start: int, end: int
801):
802 """
803 Generic encoding errorhandling function
804 Creates a UnicodeEncodeError, looks up an error handler, and calls the
805 error handler with the UnicodeEncodeError.
806 Makes sure the error handler returns a (str/bytes, int) tuple and returns it
807
808 errors: The name of the error handling function to call
809 input: The input to be encoded
810 reason: The reason the errorhandler was called
811 encoding: The encoding being used
812 start: The index of the first non-erroneus byte
813 end: The index of the first non-erroneous byte
814 """
815 exception = UnicodeEncodeError(encoding, input, start, end, reason)
816 result = lookup_error(errors)(exception)
817 if not _tuple_check(result) or _tuple_len(result) != 2:
818 raise TypeError("encoding error handler must return (str/bytes, int) tuple")
819
820 unicode, pos = result
821 if (
822 not _str_check(unicode)
823 and not _bytes_check(unicode)
824 or not _object_type_hasattr(pos, "__index__")
825 ):
826 raise TypeError("encoding error handler must return (str/bytes, int) tuple")
827
828 pos = _index(pos)
829 length = len(input)
830 if pos < 0:
831 pos += length
832 if not 0 <= pos <= length:
833 raise IndexError(f"position {pos} from error handler out of bounds")
834
835 return unicode, pos
836
837
838# TODO(T61927696): Support surrogatepass errors for utf-8 decode
839_codec_error_registry = {
840 "backslashreplace": backslashreplace_errors,
841 "strict": strict_errors,
842 "ignore": ignore_errors,
843}
844
845
846def _bytearray_string_append(dst: bytearray, data: str):
847 _builtin()
848
849
850# NOTE: This should behave the same as codecs.IncrementalEncoder.
851# TODO(T61720167): Should be removed once we can freeze encodings
852class IncrementalEncoder(object):
853 def __init__(self, errors="strict"):
854 self.errors = errors
855 self.buffer = ""
856
857 def encode(self, input, final=False):
858 raise NotImplementedError
859
860 def reset(self):
861 pass
862
863 def getstate(self):
864 return 0
865
866 def setstate(self, state):
867 pass
868
869
870# NOTE: This should behave the same as codecs.IncrementalDecoder.
871# TODO(T61720167): Should be removed once we can freeze encodings
872class IncrementalDecoder(object):
873 def __init__(self, errors="strict"):
874 self.errors = errors
875
876 def decode(self, input, final=False):
877 raise NotImplementedError
878
879 def reset(self):
880 pass
881
882 def getstate(self):
883 return (b"", 0)
884
885 def setstate(self, state):
886 pass
887
888
889# NOTE: This should behave the same as codecs.BufferedIncrementalDecoder.
890# TODO(T61720167): Should be removed once we can freeze encodings
891class BufferedIncrementalDecoder(IncrementalDecoder):
892 def __init__(self, errors="strict"):
893 IncrementalDecoder.__init__(self, errors)
894 self.buffer = b""
895
896 def _buffer_decode(self, input, errors, final):
897 raise NotImplementedError
898
899 def decode(self, input, final=False):
900 data = self.buffer + input
901 (result, consumed) = self._buffer_decode(data, self.errors, final)
902 self.buffer = data[consumed:]
903 return result
904
905 def reset(self):
906 IncrementalDecoder.reset(self)
907 self.buffer = b""
908
909 def getstate(self):
910 return (self.buffer, 0)
911
912 def setstate(self, state):
913 self.buffer = state[0]
914
915
916# TODO(T61720167): Should be removed once we can freeze encodings
917class UTF8IncrementalEncoder(IncrementalEncoder):
918 def encode(self, input, final=False):
919 return utf_8_encode(input, self.errors)[0]
920
921
922# TODO(T61720167): Should be removed once we can freeze encodings
923class UTF8IncrementalDecoder(BufferedIncrementalDecoder):
924 @staticmethod
925 def _buffer_decode(input, errors, final):
926 return utf_8_decode(input, errors, final)
927
928
929# TODO(T61720167): Should be removed once we can freeze encodings
930def getincrementaldecoder(encoding):
931 if encoding == "UTF-8" or encoding == "utf-8":
932 return UTF8IncrementalDecoder
933 decoder = lookup(encoding).incrementaldecoder
934 if decoder is None:
935 raise LookupError(encoding)
936 return decoder
937
938
939# TODO(T61720167): Should be removed once we can freeze encodings
940def getincrementalencoder(encoding):
941 if encoding == "UTF-8" or encoding == "utf-8":
942 return UTF8IncrementalEncoder
943 encoder = lookup(encoding).incrementalencoder
944 if encoder is None:
945 raise LookupError(encoding)
946 return encoder