this repo has no description
at trunk 946 lines 31 kB view raw
1#!/usr/bin/env python3 2# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com) 3 4from builtins import _index, _str_array 5 6from _builtins import ( 7 _builtin, 8 _bytes_check, 9 _bytes_decode, 10 _bytes_decode_ascii, 11 _bytes_decode_utf_8, 12 _bytes_len, 13 _byteslike_guard, 14 _int_check, 15 _object_type_hasattr, 16 _str_array_iadd, 17 _str_check, 18 _str_encode, 19 _str_encode_ascii, 20 _str_guard, 21 _str_len, 22 _tuple_check, 23 _tuple_len, 24 _type, 25 _Unbound, 26 _unimplemented, 27 maxunicode as _maxunicode, 28) 29 30 31codec_search_path = [] 32 33 34codec_search_cache = {} 35 36 37def register(search_func): 38 if not callable(search_func): 39 raise TypeError("argument must be callable") 40 codec_search_path.append(search_func) 41 42 43def lookup(encoding): 44 cached = codec_search_cache.get(encoding) 45 if cached is not None: 46 return cached 47 # Make sure that we loaded the standard codecs. 48 if not codec_search_path: 49 import encodings # noqa: F401 50 51 normalized_encoding = encoding.lower().replace(" ", "-") 52 result = None 53 for search_func in codec_search_path: 54 result = search_func(normalized_encoding) 55 if result is None: 56 continue 57 if not _tuple_check(result) or _tuple_len(result) != 4: 58 raise TypeError("codec search functions must return 4-tuples") 59 break 60 if result is None: 61 raise LookupError(f"unknown encoding: {encoding}") 62 63 codec_search_cache[encoding] = result 64 return result 65 66 67def _lookup_text(encoding, alternate_command): 68 codec = lookup(encoding) 69 if _type(codec) != tuple: 70 try: 71 if not codec._is_text_encoding: 72 raise LookupError( 73 f"{encoding} is not a text encoding; " 74 f"use {alternate_command} to handle arbitrary codecs" 75 ) 76 except AttributeError: 77 pass 78 return codec 79 80 81def decode(data, encoding: str = "utf-8", errors: str = _Unbound) -> str: 82 result = _bytes_decode(data, encoding) 83 if result is not _Unbound: 84 return result 85 try: 86 return _codec_decode_table[encoding.lower()]( 87 data, "strict" if errors is _Unbound else errors 88 )[0] 89 except KeyError: 90 try: 91 decoder = lookup(encoding)[1] 92 except LookupError: 93 raise LookupError(f"unknown encoding: {encoding}") 94 if errors is _Unbound: 95 result = decoder(data) 96 else: 97 result = decoder(data, errors) 98 if _tuple_check(result) and _tuple_len(result) == 2: 99 return result[0] 100 # CPython does not check to make sure that the second element is an int 101 raise TypeError("decoder must return a tuple (object,integer)") 102 103 104def encode(data, encoding: str = "utf-8", errors: str = _Unbound) -> bytes: 105 result = _str_encode(data, encoding) 106 if result is not _Unbound: 107 return result 108 try: 109 return _codec_encode_table[encoding.lower()]( 110 data, "strict" if errors is _Unbound else errors 111 )[0] 112 except KeyError: 113 try: 114 encoder = lookup(encoding)[0] 115 except LookupError: 116 raise LookupError(f"unknown encoding: {encoding}") 117 if errors is _Unbound: 118 result = encoder(data) 119 else: 120 result = encoder(data, errors) 121 if _tuple_check(result) and _tuple_len(result) == 2: 122 return result[0] 123 # CPython does not check to make sure that the second element is an int 124 raise TypeError("encoder must return a tuple (object, integer)") 125 126 127def _ascii_decode(data: str, errors: str, index: int, out: _str_array): 128 _builtin() 129 130 131def ascii_decode(data: bytes, errors: str = "strict"): 132 _byteslike_guard(data) 133 if not _str_check(errors): 134 raise TypeError( 135 "ascii_decode() argument 2 must be str or None, not " 136 f"'{_type(errors).__name__}'" 137 ) 138 result = _bytes_decode_ascii(data) 139 if result is not _Unbound: 140 return result, _bytes_len(data) 141 result = _str_array() 142 i = 0 143 encoded = "" 144 length = len(data) 145 while i < length: 146 encoded, i = _ascii_decode(data, errors, i, result) 147 if _int_check(encoded): 148 data, i = _call_decode_errorhandler( 149 errors, data, result, "ordinal not in range(128)", "ascii", encoded, i 150 ) 151 if _str_check(encoded): 152 return encoded, i 153 # The error handler was the last to write to the result 154 return str(result), i 155 156 157def _ascii_encode(data: str, errors: str, index: int, out: bytearray): 158 """Tries to encode `data`, starting from `index`, into the `out` bytearray. 159 If it encounters any codepoints above 127, it tries using the `errors` 160 error handler to fix it internally, but returns the a tuple of the first 161 and last index of the error on failure. 162 If it finishes encoding, it returns a tuple of the final bytes and length. 163 """ 164 _builtin() 165 166 167def ascii_encode(data: str, errors: str = "strict"): 168 if not _str_check(data): 169 raise TypeError( 170 f"ascii_encode() argument 1 must be str, not {_type(data).__name__}" 171 ) 172 if not _str_check(errors): 173 raise TypeError( 174 "ascii_encode() argument 2 must be str or None, not " 175 f"{_type(errors).__name__}" 176 ) 177 result = _str_encode_ascii(data) 178 if result is not _Unbound: 179 return result, _str_len(data) 180 result = bytearray() 181 i = 0 182 encoded = b"" 183 length = _str_len(data) 184 while i < length: 185 encoded, i = _ascii_encode(data, errors, i, result) 186 if _int_check(encoded): 187 unicode, pos = _call_encode_errorhandler( 188 errors, data, "ordinal not in range(128)", "ascii", encoded, i 189 ) 190 if _bytes_check(unicode): 191 result += unicode 192 i = pos 193 continue 194 for char in unicode: 195 if char > "\x7F": 196 raise UnicodeEncodeError( 197 "ascii", data, encoded, i, "ordinal not in range(128)" 198 ) 199 _bytearray_string_append(result, unicode) 200 i = pos 201 if _bytes_check(encoded): 202 return encoded, i 203 # _ascii_encode encountered an error and _call_encode_errorhandler was the 204 # last function to write to `result`. 205 return bytes(result), i 206 207 208def charmap_decode(data, errors="strict", mapping=None): 209 _byteslike_guard(data) 210 _str_guard(errors) 211 if errors != "strict": 212 _unimplemented() 213 214 result = _str_array() 215 data_len = _bytes_len(data) 216 i = 0 217 while i < data_len: 218 try: 219 mapped = mapping[data[i]] 220 if mapped is None or mapped == "\ufffe": 221 raise UnicodeDecodeError( 222 "charmap", data, data[i], i, "character maps to <undefined>" 223 ) 224 if _int_check(mapped): 225 if mapped < 0 or mapped > _maxunicode: 226 raise TypeError( 227 f"character mapping must be in range ({_maxunicode + 1:#x})" 228 ) 229 mapped = chr(mapped) 230 elif not _str_check(mapped): 231 raise TypeError("character mapping must return integer, None or str") 232 _str_array_iadd(result, mapped) 233 except (IndexError, KeyError): 234 raise UnicodeDecodeError( 235 "charmap", data, data[i], i, "character maps to <undefined>" 236 ) 237 i += 1 238 239 return str(result), data_len 240 241 242def _escape_decode(data: bytes, errors: str, recode_encoding: str): 243 """Tries to decode `data`. 244 If it runs into any errors, it raises and returns the message to throw. 245 If it finishes encoding, it returns a tuple of 246 (decoded, length, first_invalid_escape) 247 where the first_invalid_escape is either the index into the data of the first 248 invalid escape sequence, or -1 if none occur. 249 Will eventually have to handle the recode_encoding argument. 250 """ 251 _builtin() 252 253 254def _escape_decode_stateful( 255 data: bytes, errors: str = "strict", recode_encoding: str = "" 256): 257 if not _str_check(data): 258 _byteslike_guard(data) 259 if not _str_check(errors): 260 raise TypeError( 261 "escape_decode() argument 2 must be str or None, not " 262 f"{type(errors).__name__}" 263 ) 264 decoded = _escape_decode(data, errors, recode_encoding) 265 if _str_check(decoded): 266 raise ValueError(decoded) 267 return decoded 268 269 270def escape_decode(data, errors: str = "strict"): 271 escaped, length, _ = _escape_decode_stateful(data, errors) 272 return escaped, length 273 274 275def _latin_1_decode(data: bytes): 276 _builtin() 277 278 279def latin_1_decode(data: bytes, errors: str = "strict"): 280 _byteslike_guard(data) 281 if not _str_check(errors): 282 raise TypeError( 283 "latin_1_decode() argument 2 must be str or None, not " 284 f"'{_type(errors).__name__}'" 285 ) 286 return _latin_1_decode(data) 287 288 289def _latin_1_encode(data: str, errors: str, index: int, out: bytearray): 290 """Tries to encode `data`, starting from `index`, into the `out` bytearray. 291 If it encounters any codepoints above 255, it tries using the `errors` 292 error handler to fix it internally, but returns the a tuple of the first 293 and last index of the error on failure. 294 If it finishes encoding, it returns a tuple of the final bytes and length. 295 """ 296 _builtin() 297 298 299def latin_1_encode(data: str, errors: str = "strict"): 300 if not _str_check(data): 301 raise TypeError( 302 f"latin_1_encode() argument 1 must be str, not {_type(data).__name__}" 303 ) 304 if not _str_check(errors): 305 raise TypeError( 306 "latin_1_encode() argument 2 must be str or None, not " 307 f"{_type(errors).__name__}" 308 ) 309 result = bytearray() 310 i = 0 311 encoded = b"" 312 length = _str_len(data) 313 while i < length: 314 encoded, i = _latin_1_encode(data, errors, i, result) 315 if _int_check(encoded): 316 unicode, pos = _call_encode_errorhandler( 317 errors, data, "ordinal not in range(256)", "latin-1", encoded, i 318 ) 319 if _bytes_check(unicode): 320 result += unicode 321 i = pos 322 continue 323 for char in unicode: 324 if char > "\xFF": 325 raise UnicodeEncodeError( 326 "latin-1", data, encoded, i, "ordinal not in range(256)" 327 ) 328 result += latin_1_encode(unicode, errors)[0] 329 i = pos 330 if _bytes_check(encoded): 331 return encoded, i 332 # _latin_1_encode encountered an error and _call_encode_errorhandler was the 333 # last function to write to `result`. 334 return bytes(result), i 335 336 337def _raw_unicode_escape_decode(data: bytes, errors: str, index: int, out: _str_array): 338 """Tries to decode `data`, starting from `index`, into the `out` _str_array. 339 Only decodes raw unicode uXXXX or UXXXXXXXX. 340 If it runs into any errors, it returns a tuple of 341 (error_start, error_end, error_message), 342 If it finishes decoding, it returns a tuple of 343 (decoded, length) 344 """ 345 _builtin() 346 347 348def raw_unicode_escape_decode(data, errors: str = "strict"): 349 if not _str_check(data): 350 _byteslike_guard(data) 351 if not _str_check(errors): 352 raise TypeError( 353 "raw_unicode_escape_decode() argument 2 must be str, not " 354 f"{type(errors).__name__}" 355 ) 356 result = _str_array() 357 i = 0 358 decoded = "" 359 length = len(data) 360 while i < length: 361 decoded, i, error_msg = _raw_unicode_escape_decode(data, errors, i, result) 362 if error_msg: 363 data, i = _call_decode_errorhandler( 364 errors, data, result, error_msg, "rawunicodeescape", decoded, i 365 ) 366 if _str_check(decoded): 367 return decoded, i 368 # The error handler was the last to write to the result 369 return str(result), i 370 371 372def _raw_unicode_escape_encode(data): 373 _builtin() 374 375 376def raw_unicode_escape_encode(data, errors: str = "strict"): 377 if not _str_check(data): 378 raise TypeError( 379 f"raw_unicode_escape_encode() argument 1 must be str, not {_type(data).__name__}" 380 ) 381 if not _str_check(errors): 382 raise TypeError( 383 "raw_unicode_escape_encode() argument 2 must be str, not " 384 f"{type(errors).__name__}" 385 ) 386 return _raw_unicode_escape_encode(data) 387 388 389def _unicode_escape_decode(data: bytes, errors: str, index: int, out: _str_array): 390 """Tries to decode `data`, starting from `index`, into the `out` _str_array. 391 If it runs into any errors, it returns a tuple of 392 (error_start, error_end, error_message, first_invalid_escape), 393 where the first_invalid_escape is either the index into the data of the first 394 invalid escape sequence, or -1 if none occur. 395 If it finishes encoding, it returns a tuple of 396 (decoded, length, "", first_invalid_escape) 397 """ 398 _builtin() 399 400 401def _unicode_escape_decode_stateful(data: bytes, errors: str = "strict"): 402 if not _str_check(data): 403 _byteslike_guard(data) 404 if not _str_check(errors): 405 raise TypeError( 406 "unicode_escape_decode() argument 2 must be str or None, not " 407 f"{type(errors).__name__}" 408 ) 409 result = _str_array() 410 i = 0 411 decoded = "" 412 length = len(data) 413 first_invalid = -1 414 while i < length: 415 decoded, i, error_msg, first_invalid = _unicode_escape_decode( 416 data, errors, i, result 417 ) 418 if error_msg: 419 data, i = _call_decode_errorhandler( 420 errors, data, result, error_msg, "unicodeescape", decoded, i 421 ) 422 if _str_check(decoded): 423 return decoded, i, first_invalid 424 # The error handler was the last to write to the result 425 return str(result), i, first_invalid 426 427 428def unicode_escape_decode(data, errors: str = "strict"): 429 escaped, length, _ = _unicode_escape_decode_stateful(data, errors) 430 return escaped, length 431 432 433def unicode_escape_encode(data, errors: str = "strict"): 434 _unimplemented() 435 436 437def _utf_8_decode( 438 data: bytes, errors: str, index: int, out: _str_array, is_final: bool 439): 440 """Tries to decode `data`, starting from `index`, into the `out` _str_array. 441 If it runs into any errors, it returns a tuple of 442 (error_start, error_end, error_message), 443 If it finishes encoding, it returns a tuple of 444 (decoded, length, "") 445 """ 446 _builtin() 447 448 449def utf_8_decode(data: bytes, errors: str = "strict", is_final: bool = False): 450 _byteslike_guard(data) 451 if not _str_check(errors) and not None: 452 raise TypeError( 453 "utf_8_decode() argument 2 must be str or None, not " 454 f"'{_type(errors).__name__}'" 455 ) 456 result = _bytes_decode_utf_8(data) 457 if result is not _Unbound: 458 return result, _bytes_len(data) 459 result = _str_array() 460 i = 0 461 encoded = "" 462 length = len(data) 463 while i < length: 464 encoded, i, errmsg = _utf_8_decode(data, errors, i, result, is_final) 465 if _int_check(encoded): 466 data, i = _call_decode_errorhandler( 467 errors, data, result, errmsg, "utf-8", encoded, i 468 ) 469 continue 470 # If encoded isn't an int, utf_8_decode returned because it ran into 471 # an error it could potentially recover from and is_final is true. 472 # We should stop decoding in this case. 473 break 474 if _str_check(encoded): 475 return encoded, i 476 # The error handler was the last to write to the result 477 return str(result), i 478 479 480def _utf_8_encode(data: str, errors: str, index: int, out: bytearray): 481 """Tries to encode `data`, starting from `index`, into the `out` bytearray. 482 If it encounters an error, it tries using the `errors` error handler to 483 fix it internally, but returns the a tuple of the first and last index of 484 the error. 485 If it finishes encoding, it returns a tuple of the final bytes and length. 486 """ 487 _builtin() 488 489 490def utf_8_encode(data: str, errors: str = "strict"): 491 if not _str_check(data): 492 raise TypeError( 493 f"utf_8_encode() argument 1 must be str, not {_type(data).__name__}" 494 ) 495 if not _str_check(errors): 496 raise TypeError( 497 "utf_8_encode() argument 2 must be str or None, not " 498 f"{_type(errors).__name__}" 499 ) 500 result = bytearray() 501 i = 0 502 encoded = bytes() 503 length = _str_len(data) 504 while i < length: 505 encoded, i = _utf_8_encode(data, errors, i, result) 506 if _int_check(encoded): 507 unicode, pos = _call_encode_errorhandler( 508 errors, data, "surrogates not allowed", "utf-8", encoded, i 509 ) 510 if _bytes_check(unicode): 511 result += unicode 512 i = pos 513 continue 514 for char in unicode: 515 if char > "\x7F": 516 raise UnicodeEncodeError( 517 "utf-8", data, encoded, i, "surrogates not allowed" 518 ) 519 _bytearray_string_append(result, unicode) 520 i = pos 521 if _bytes_check(encoded): 522 return encoded, i 523 # _utf_8_encode encountered an error and _call_encode_errorhandler was the 524 # last function to write to `result`. 525 return bytes(result), i 526 527 528def _utf_16_encode(data: str, errors: str, index: int, out: bytearray, byteorder: int): 529 _builtin() 530 531 532def utf_16_encode(data: str, errors: str = "strict", byteorder: int = 0): # noqa: C901 533 if byteorder < 0: 534 h_encoding = "utf-16-le" 535 u_encoding = "utf_16_le" 536 elif byteorder < 0: 537 h_encoding = "utf-16-be" 538 u_encoding = "utf_16_be" 539 else: 540 h_encoding = "utf-16" 541 u_encoding = "utf_16" 542 if not _str_check(data): 543 raise TypeError( 544 f"{u_encoding}_encode() argument 1 must be str, not {_type(data).__name__}" 545 ) 546 if not _str_check(errors): 547 raise TypeError( 548 f"{u_encoding}_encode() argument 2 must be str or None, not " 549 f"{_type(errors).__name__}" 550 ) 551 result = bytearray() 552 if byteorder == 0: 553 result += b"\xFF" 554 result += b"\xFE" 555 i = 0 556 length = _str_len(data) 557 encoded = bytes(result) 558 while i < length: 559 encoded, i = _utf_16_encode(data, errors, i, result, byteorder) 560 if _int_check(encoded): 561 unicode, pos = _call_encode_errorhandler( 562 errors, data, "surrogates not allowed", h_encoding, encoded, i 563 ) 564 if _bytes_check(unicode): 565 if _bytes_len(unicode) & 1: 566 raise UnicodeEncodeError( 567 h_encoding, data, encoded, i, "surrogates not allowed" 568 ) 569 result += unicode 570 i = pos 571 continue 572 for char in unicode: 573 if char > "\x7F": 574 raise UnicodeEncodeError( 575 h_encoding, data, encoded, i, "surrogates not allowed" 576 ) 577 result += utf_16_encode( 578 unicode, errors, -1 if byteorder == 0 else byteorder 579 )[0] 580 i = pos 581 if _bytes_check(encoded): 582 return encoded, i 583 # _utf_16_encode encountered an error and _call_encode_errorhandler was the 584 # last function to write to `result`. 585 return bytes(result), i 586 587 588def utf_16_le_decode(data: str, errors: str = "strict"): 589 _unimplemented() 590 591 592def utf_16_le_encode(data: str, errors: str = "strict"): 593 return utf_16_encode(data, errors, -1) 594 595 596def utf_16_be_decode(data: str, errors: str = "strict"): 597 _unimplemented() 598 599 600def utf_16_be_encode(data: str, errors: str = "strict"): 601 return utf_16_encode(data, errors, 1) 602 603 604def _utf_32_encode(data: str, errors: str, index: int, out: bytearray, byteorder: int): 605 _builtin() 606 607 608def utf_32_encode(data: str, errors: str = "strict", byteorder: int = 0): # noqa: C901 609 if byteorder < 0: 610 hEncoding = "utf-32-le" 611 uEncoding = "utf_32_le" 612 elif byteorder < 0: 613 hEncoding = "utf-32-be" 614 uEncoding = "utf_32_be" 615 else: 616 hEncoding = "utf-32" 617 uEncoding = "utf_32" 618 if not _str_check(data): 619 raise TypeError( 620 f"{uEncoding}_encode() argument 1 must be str, not {_type(data).__name__}" 621 ) 622 if not _str_check(errors): 623 raise TypeError( 624 f"{uEncoding}_encode() argument 2 must be str or None, not " 625 f"{_type(errors).__name__}" 626 ) 627 result = bytearray() 628 if byteorder == 0: 629 result += b"\xFF\xFE\x00\x00" 630 i = 0 631 length = _str_len(data) 632 encoded = bytes(result) 633 while i < length: 634 encoded, i = _utf_32_encode(data, errors, i, result, byteorder) 635 if _int_check(encoded): 636 unicode, pos = _call_encode_errorhandler( 637 errors, data, "surrogates not allowed", hEncoding, encoded, i 638 ) 639 if _bytes_check(unicode): 640 if _bytes_len(unicode) & 3: 641 raise UnicodeEncodeError( 642 hEncoding, data, encoded, i, "surrogates not allowed" 643 ) 644 result += unicode 645 i = pos 646 continue 647 for char in unicode: 648 if char > "\x7f": 649 raise UnicodeEncodeError( 650 hEncoding, data, encoded, i, "surrogates not allowed" 651 ) 652 result += utf_32_encode( 653 unicode, errors, -1 if byteorder == 0 else byteorder 654 )[0] 655 i = pos 656 if _bytes_check(encoded): 657 return encoded, i 658 # _utf_32_encode encountered an error and _call_encode_errorhandler was the 659 # last function to write to `result`. 660 return bytes(result), i 661 662 663def utf_32_le_encode(data: str, errors: str = "strict"): 664 return utf_32_encode(data, errors, -1) 665 666 667def utf_32_be_encode(data: str, errors: str = "strict"): 668 return utf_32_encode(data, errors, 1) 669 670 671_codec_decode_table = { 672 "ascii": ascii_decode, 673 "us_ascii": ascii_decode, 674 "latin1": latin_1_decode, 675 "latin 1": latin_1_decode, 676 "latin-1": latin_1_decode, 677 "latin_1": latin_1_decode, 678 "utf_8": utf_8_decode, 679 "utf-8": utf_8_decode, 680 "utf8": utf_8_decode, 681} 682 683_codec_encode_table = { 684 "ascii": ascii_encode, 685 "us_ascii": ascii_encode, 686 "latin_1": latin_1_encode, 687 "latin-1": latin_1_encode, 688 "iso-8859-1": latin_1_encode, 689 "iso_8859_1": latin_1_encode, 690 "utf_8": utf_8_encode, 691 "utf-8": utf_8_encode, 692 "utf8": utf_8_encode, 693 "utf_16": utf_16_encode, 694 "utf-16": utf_16_encode, 695 "utf16": utf_16_encode, 696 "utf_16_le": utf_16_le_encode, 697 "utf-16-le": utf_16_le_encode, 698 "utf_16_be": utf_16_be_encode, 699 "utf-16-be": utf_16_be_encode, 700 "utf_32": utf_32_encode, 701 "utf-32": utf_32_encode, 702 "utf32": utf_32_encode, 703 "utf_32_le": utf_32_le_encode, 704 "utf-32-le": utf_32_le_encode, 705 "utf_32_be": utf_32_be_encode, 706 "utf-32-be": utf_32_be_encode, 707} 708 709 710def backslashreplace_errors(error): 711 _builtin() 712 713 714def strict_errors(error): 715 if not isinstance(error, Exception): 716 raise TypeError("codec must pass exception instance") 717 raise error 718 719 720def ignore_errors(error): 721 if not isinstance(error, UnicodeError): 722 raise TypeError( 723 f"don't know how to handle {_type(error).__name__} in error callback" 724 ) 725 return ("", error.end) 726 727 728def lookup_error(error: str): 729 if not _str_check(error): 730 raise TypeError( 731 f"lookup_error() argument must be str, not {_type(error).__name__}" 732 ) 733 try: 734 return _codec_error_registry[error] 735 except KeyError: 736 raise LookupError(f"unknown error handler name '{error}'") 737 738 739def register_error(name: str, error_func): 740 if not _str_check(name): 741 raise TypeError( 742 f"register_error() argument 1 must be str, not {_type(name).__name__}" 743 ) 744 if not callable(error_func): 745 raise TypeError("handler must be callable") 746 _codec_error_registry[name] = error_func 747 748 749def _call_decode_errorhandler( 750 errors: str, 751 input: bytes, 752 output: _str_array, 753 reason: str, 754 encoding: str, 755 start: int, 756 end: int, 757): 758 """ 759 Generic decoding errorhandling function 760 Creates a UnicodeDecodeError, looks up an error handler, and calls the 761 error handler with the UnicodeDecodeError. 762 Makes sure the error handler returns a (str, int) tuple and returns it and 763 writes the str to the output _str_array passed in. 764 Since the error handler can change the object that's being decoded by 765 replacing the object of the UnicodeDecodeError, this function returns the 766 Error's object field, along with the integer returned from the function 767 call that's been normalized to fit within the length of the object. 768 769 errors: The name of the error handling function to call 770 input: The input to be decoded 771 output: The string builder that the error handling result should be appended to 772 reason: The reason the errorhandler was called 773 encoding: The encoding being used 774 start: The index of the first non-erroneus byte 775 end: The index of the first non-erroneous byte 776 """ 777 exception = UnicodeDecodeError(encoding, input, start, end, reason) 778 result = lookup_error(errors)(exception) 779 if not _tuple_check(result) or _tuple_len(result) != 2: 780 raise TypeError("decoding error handler must return (str, int) tuple") 781 782 replacement, pos = result 783 if not _str_check(replacement) or not _object_type_hasattr(pos, "__index__"): 784 raise TypeError("decoding error handler must return (str, int) tuple") 785 786 pos = _index(pos) 787 input = exception.object 788 if not _bytes_check(input): 789 raise TypeError("exception attribute object must be bytes") 790 if pos < 0: 791 pos += _bytes_len(input) 792 if not 0 <= pos <= _bytes_len(input): 793 raise IndexError(f"position {pos} from error handler out of bounds") 794 _str_array_iadd(output, replacement) 795 796 return (input, pos) 797 798 799def _call_encode_errorhandler( 800 errors: str, input: str, reason: str, encoding: str, start: int, end: int 801): 802 """ 803 Generic encoding errorhandling function 804 Creates a UnicodeEncodeError, looks up an error handler, and calls the 805 error handler with the UnicodeEncodeError. 806 Makes sure the error handler returns a (str/bytes, int) tuple and returns it 807 808 errors: The name of the error handling function to call 809 input: The input to be encoded 810 reason: The reason the errorhandler was called 811 encoding: The encoding being used 812 start: The index of the first non-erroneus byte 813 end: The index of the first non-erroneous byte 814 """ 815 exception = UnicodeEncodeError(encoding, input, start, end, reason) 816 result = lookup_error(errors)(exception) 817 if not _tuple_check(result) or _tuple_len(result) != 2: 818 raise TypeError("encoding error handler must return (str/bytes, int) tuple") 819 820 unicode, pos = result 821 if ( 822 not _str_check(unicode) 823 and not _bytes_check(unicode) 824 or not _object_type_hasattr(pos, "__index__") 825 ): 826 raise TypeError("encoding error handler must return (str/bytes, int) tuple") 827 828 pos = _index(pos) 829 length = len(input) 830 if pos < 0: 831 pos += length 832 if not 0 <= pos <= length: 833 raise IndexError(f"position {pos} from error handler out of bounds") 834 835 return unicode, pos 836 837 838# TODO(T61927696): Support surrogatepass errors for utf-8 decode 839_codec_error_registry = { 840 "backslashreplace": backslashreplace_errors, 841 "strict": strict_errors, 842 "ignore": ignore_errors, 843} 844 845 846def _bytearray_string_append(dst: bytearray, data: str): 847 _builtin() 848 849 850# NOTE: This should behave the same as codecs.IncrementalEncoder. 851# TODO(T61720167): Should be removed once we can freeze encodings 852class IncrementalEncoder(object): 853 def __init__(self, errors="strict"): 854 self.errors = errors 855 self.buffer = "" 856 857 def encode(self, input, final=False): 858 raise NotImplementedError 859 860 def reset(self): 861 pass 862 863 def getstate(self): 864 return 0 865 866 def setstate(self, state): 867 pass 868 869 870# NOTE: This should behave the same as codecs.IncrementalDecoder. 871# TODO(T61720167): Should be removed once we can freeze encodings 872class IncrementalDecoder(object): 873 def __init__(self, errors="strict"): 874 self.errors = errors 875 876 def decode(self, input, final=False): 877 raise NotImplementedError 878 879 def reset(self): 880 pass 881 882 def getstate(self): 883 return (b"", 0) 884 885 def setstate(self, state): 886 pass 887 888 889# NOTE: This should behave the same as codecs.BufferedIncrementalDecoder. 890# TODO(T61720167): Should be removed once we can freeze encodings 891class BufferedIncrementalDecoder(IncrementalDecoder): 892 def __init__(self, errors="strict"): 893 IncrementalDecoder.__init__(self, errors) 894 self.buffer = b"" 895 896 def _buffer_decode(self, input, errors, final): 897 raise NotImplementedError 898 899 def decode(self, input, final=False): 900 data = self.buffer + input 901 (result, consumed) = self._buffer_decode(data, self.errors, final) 902 self.buffer = data[consumed:] 903 return result 904 905 def reset(self): 906 IncrementalDecoder.reset(self) 907 self.buffer = b"" 908 909 def getstate(self): 910 return (self.buffer, 0) 911 912 def setstate(self, state): 913 self.buffer = state[0] 914 915 916# TODO(T61720167): Should be removed once we can freeze encodings 917class UTF8IncrementalEncoder(IncrementalEncoder): 918 def encode(self, input, final=False): 919 return utf_8_encode(input, self.errors)[0] 920 921 922# TODO(T61720167): Should be removed once we can freeze encodings 923class UTF8IncrementalDecoder(BufferedIncrementalDecoder): 924 @staticmethod 925 def _buffer_decode(input, errors, final): 926 return utf_8_decode(input, errors, final) 927 928 929# TODO(T61720167): Should be removed once we can freeze encodings 930def getincrementaldecoder(encoding): 931 if encoding == "UTF-8" or encoding == "utf-8": 932 return UTF8IncrementalDecoder 933 decoder = lookup(encoding).incrementaldecoder 934 if decoder is None: 935 raise LookupError(encoding) 936 return decoder 937 938 939# TODO(T61720167): Should be removed once we can freeze encodings 940def getincrementalencoder(encoding): 941 if encoding == "UTF-8" or encoding == "utf-8": 942 return UTF8IncrementalEncoder 943 encoder = lookup(encoding).incrementalencoder 944 if encoder is None: 945 raise LookupError(encoding) 946 return encoder