this repo has no description
at trunk 3954 lines 144 kB view raw
1#!/usr/bin/env python3 2# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com) 3import _io 4import array 5import mmap 6import os 7import subprocess 8import sys 9import tempfile 10import unittest 11import unittest.mock 12from unittest.mock import Mock 13 14from test_support import pyro_only 15 16 17try: 18 from _io import _BufferedIOMixin 19except ImportError: 20 pass 21 22 23def _getfd(): 24 r, w = os.pipe() 25 os.close(w) # So that read() is harmless 26 return r 27 28 29class _IOBaseTests(unittest.TestCase): 30 def test_closed_with_closed_instance_returns_true(self): 31 f = _io._IOBase() 32 self.assertFalse(f.closed) 33 f.close() 34 self.assertTrue(f.closed) 35 36 def test_fileno_raises_unsupported_operation(self): 37 f = _io._IOBase() 38 self.assertRaises(_io.UnsupportedOperation, f.fileno) 39 40 def test_flush_with_closed_instance_raises_value_error(self): 41 f = _io._IOBase() 42 f.close() 43 with self.assertRaises(ValueError): 44 f.flush() 45 46 def test_isatty_with_closed_instance_raises_value_error(self): 47 f = _io._IOBase() 48 f.close() 49 with self.assertRaises(ValueError): 50 f.isatty() 51 52 def test_isatty_always_returns_false(self): 53 f = _io._IOBase() 54 self.assertEqual(f.isatty(), False) 55 56 def test_iter_with_closed_instance_raises_value_error(self): 57 f = _io._IOBase() 58 f.close() 59 with self.assertRaises(ValueError): 60 f.__iter__() 61 62 def test_next_reads_line(self): 63 class C(_io._IOBase): 64 def readline(self): 65 return "foo" 66 67 f = C() 68 self.assertEqual(f.__next__(), "foo") 69 70 def test_readline_calls_read(self): 71 class C(_io._IOBase): 72 def read(self, size): 73 raise UserWarning("foo") 74 75 f = C() 76 with self.assertRaises(UserWarning): 77 f.readline() 78 79 def test_readlines_calls_read(self): 80 class C(_io._IOBase): 81 def read(self, size): 82 raise UserWarning("foo") 83 84 f = C() 85 with self.assertRaises(UserWarning): 86 f.readlines() 87 88 def test_readlines_calls_readline(self): 89 class C(_io._IOBase): 90 def readline(self): 91 raise UserWarning("foo") 92 93 f = C() 94 with self.assertRaises(UserWarning): 95 f.readlines() 96 97 def test_readlines_returns_list(self): 98 class C(_io._IOBase): 99 def __iter__(self): 100 return ["x", "y", "z"].__iter__() 101 102 f = C() 103 self.assertEqual(f.readlines(), ["x", "y", "z"]) 104 105 def test_readable_always_returns_false(self): 106 f = _io._IOBase() 107 self.assertFalse(f.readable()) 108 109 def test_seek_raises_unsupported_operation(self): 110 f = _io._IOBase() 111 self.assertRaises(_io.UnsupportedOperation, f.seek, 0) 112 113 def test_seekable_always_returns_false(self): 114 f = _io._IOBase() 115 self.assertFalse(f.seekable()) 116 117 def test_supports_arbitrary_attributes(self): 118 f = _io._IOBase() 119 f.hello_world = 42 120 self.assertEqual(f.hello_world, 42) 121 122 def test_tell_raises_unsupported_operation(self): 123 f = _io._IOBase() 124 self.assertRaises(_io.UnsupportedOperation, f.tell) 125 126 def test_truncate_raises_unsupported_operation(self): 127 f = _io._IOBase() 128 self.assertRaises(_io.UnsupportedOperation, f.truncate) 129 130 def test_writable_always_returns_false(self): 131 f = _io._IOBase() 132 self.assertFalse(f.writable()) 133 134 def test_with_with_closed_instance_raises_value_error(self): 135 f = _io._IOBase() 136 f.close() 137 with self.assertRaises(ValueError): 138 with f: 139 pass 140 141 def test_writelines_calls_write(self): 142 class C(_io._IOBase): 143 def write(self, line): 144 raise UserWarning("foo") 145 146 f = C() 147 with self.assertRaises(UserWarning): 148 f.writelines(["a"]) 149 150 151class _RawIOBaseTests(unittest.TestCase): 152 def test_read_with_readinto_returning_none_returns_none(self): 153 class C(_io._RawIOBase): 154 def readinto(self, b): 155 return None 156 157 f = C() 158 self.assertEqual(f.read(), None) 159 160 def test_read_with_negative_size_calls_readall(self): 161 class C(_io._RawIOBase): 162 def readall(self): 163 raise UserWarning("foo") 164 165 f = C() 166 with self.assertRaises(UserWarning): 167 f.read(-5) 168 169 def test_read_with_nonnegative_size_calls_readinto(self): 170 class C(_io._RawIOBase): 171 def readinto(self, b): 172 for i in range(len(b)): 173 b[i] = ord("x") 174 return len(b) 175 176 f = C() 177 self.assertEqual(f.read(5), b"xxxxx") 178 179 def test_readall_calls_read(self): 180 class C(_io._RawIOBase): 181 def read(self, size): 182 raise UserWarning("foo") 183 184 f = C() 185 with self.assertRaises(UserWarning): 186 f.readall() 187 188 def test_readinto_raises_not_implemented_error(self): 189 f = _io._RawIOBase() 190 b = bytearray(b"") 191 self.assertRaises(NotImplementedError, f.readinto, b) 192 193 def test_write_raises_not_implemented_error(self): 194 f = _io._RawIOBase() 195 self.assertRaises(NotImplementedError, f.write, b"") 196 197 198class _BufferedIOBaseTests(unittest.TestCase): 199 def test_detach_raises_unsupported_operation(self): 200 f = _io._BufferedIOBase() 201 self.assertIn("detach", _io._BufferedIOBase.__dict__) 202 self.assertRaises(_io.UnsupportedOperation, f.detach) 203 204 def test_read_raises_unsupported_operation(self): 205 f = _io._BufferedIOBase() 206 self.assertIn("read", _io._BufferedIOBase.__dict__) 207 self.assertRaises(_io.UnsupportedOperation, f.read) 208 209 def test_read1_raises_unsupported_operation(self): 210 f = _io._BufferedIOBase() 211 self.assertIn("read1", _io._BufferedIOBase.__dict__) 212 self.assertRaises(_io.UnsupportedOperation, f.read1) 213 214 def test_readinto1_calls_read1(self): 215 class C(_io._BufferedIOBase): 216 def read1(self, n): 217 raise UserWarning("foo") 218 219 f = C() 220 with self.assertRaises(UserWarning): 221 f.readinto1(bytearray()) 222 223 def test_readinto_calls_read(self): 224 class C(_io._BufferedIOBase): 225 def read(self, n): 226 raise UserWarning("foo") 227 228 f = C() 229 with self.assertRaises(UserWarning): 230 f.readinto(bytearray()) 231 232 def test_write_raises_unsupported_operation(self): 233 f = _io._BufferedIOBase() 234 self.assertIn("write", _io._BufferedIOBase.__dict__) 235 self.assertRaises(_io.UnsupportedOperation, f.write, b"") 236 237 238class BufferedRWPairTests(unittest.TestCase): 239 def test_dunder_init_with_non_readable_reader_raises_unsupported_operation(self): 240 class C: 241 def readable(self): 242 return False 243 244 def writable(self): 245 return False 246 247 with self.assertRaisesRegex(_io.UnsupportedOperation, "readable"): 248 _io.BufferedRWPair(C(), C()) 249 250 def test_dunder_init_with_non_writable_writer_raises_unsupported_operation(self): 251 class C: 252 def readable(self): 253 return True 254 255 def writable(self): 256 return False 257 258 with self.assertRaisesRegex(_io.UnsupportedOperation, "writable"): 259 _io.BufferedRWPair(C(), C()) 260 261 def test_close_calls_reader_close_and_writer_close(self): 262 class Reader(_io.BytesIO): 263 close = Mock(name="close") 264 265 class Writer(_io.BytesIO): 266 close = Mock(name="close") 267 268 with Reader() as reader, Writer() as writer: 269 with _io.BufferedRWPair(reader, writer) as buffer: 270 buffer.close() 271 reader.close.assert_called_once() 272 writer.close.assert_called_once() 273 274 def test_closed_returns_writer_closed(self): 275 with _io.BytesIO() as reader, _io.BytesIO() as writer: 276 with _io.BufferedRWPair(reader, writer) as buffer: 277 self.assertFalse(buffer.closed) 278 writer.close() 279 self.assertTrue(buffer.closed) 280 281 def test_flush_calls_writer_write(self): 282 write_calls = 0 283 284 class Reader(_io.BytesIO): 285 pass 286 287 class Writer(_io.BytesIO): 288 def write(self, b): 289 nonlocal write_calls 290 write_calls += 1 291 return super().write(b) 292 293 with Reader() as reader, Writer() as writer: 294 with _io.BufferedRWPair(reader, writer) as buffer: 295 buffer.write(b"123") 296 self.assertEqual(write_calls, 0) 297 buffer.flush() 298 self.assertEqual(write_calls, 1) 299 300 def test_isatty_with_tty_reader_returns_true(self): 301 class Reader(_io.BytesIO): 302 isatty = Mock(name="isatty", return_value=True) 303 304 class Writer(_io.BytesIO): 305 isatty = Mock(name="isatty", return_value=False) 306 307 with Reader() as reader, Writer() as writer: 308 with _io.BufferedRWPair(reader, writer) as buffer: 309 self.assertTrue(buffer.isatty()) 310 reader.isatty.assert_called_once() 311 writer.isatty.assert_called_once() 312 313 def test_isatty_with_tty_writer_returns_true(self): 314 class Reader(_io.BytesIO): 315 isatty = Mock(name="isatty", return_value=False) 316 317 class Writer(_io.BytesIO): 318 isatty = Mock(name="isatty", return_value=True) 319 320 with Reader() as reader, Writer() as writer: 321 with _io.BufferedRWPair(reader, writer) as buffer: 322 self.assertTrue(buffer.isatty()) 323 reader.isatty.assert_not_called() 324 writer.isatty.assert_called_once() 325 326 def test_isatty_with_neither_tty_returns_false(self): 327 class Reader(_io.BytesIO): 328 isatty = Mock(name="isatty", return_value=False) 329 330 class Writer(_io.BytesIO): 331 isatty = Mock(name="isatty", return_value=False) 332 333 with Reader() as reader, Writer() as writer: 334 with _io.BufferedRWPair(reader, writer) as buffer: 335 self.assertFalse(buffer.isatty()) 336 reader.isatty.assert_called_once() 337 writer.isatty.assert_called_once() 338 339 def test_peek_does_not_call_reader_or_writer_peek(self): 340 class Reader(_io.BytesIO): 341 peek = Mock(name="peek") 342 343 class Writer(_io.BytesIO): 344 peek = Mock(name="peek") 345 346 with Reader() as reader, Writer() as writer: 347 with _io.BufferedRWPair(reader, writer) as buffer: 348 buffer.peek() 349 reader.peek.assert_not_called() 350 writer.peek.assert_not_called() 351 352 def test_read_calls_reader_readall(self): 353 class Reader(_io.BytesIO): 354 read = Mock(name="read") 355 readall = Mock(name="readall", return_value=None) 356 357 with Reader() as reader, _io.BytesIO() as writer: 358 with _io.BufferedRWPair(reader, writer) as buffer: 359 buffer.read() 360 reader.read.assert_not_called() 361 reader.readall.assert_called_once() 362 363 def test_readable_on_closed_raises_value_error(self): 364 with _io.BytesIO() as reader, _io.BytesIO() as writer: 365 with _io.BufferedRWPair(reader, writer) as buffer: 366 pass 367 with self.assertRaises(ValueError) as context: 368 buffer.readable() 369 self.assertIn("closed file", str(context.exception)) 370 371 def test_readable_calls_reader_readable(self): 372 class Reader(_io.BytesIO): 373 readable = Mock(name="readable", return_value=True) 374 375 class Writer(_io.BytesIO): 376 readable = Mock(name="readable") 377 378 with Reader() as reader, Writer() as writer: 379 with _io.BufferedRWPair(reader, writer) as buffer: 380 self.assertEqual(reader.readable.call_count, 2) 381 self.assertTrue(buffer.readable()) 382 self.assertEqual(reader.readable.call_count, 3) 383 writer.readable.assert_not_called() 384 385 def test_writable_on_closed_raises_value_error(self): 386 with _io.BytesIO() as reader, _io.BytesIO() as writer: 387 with _io.BufferedRWPair(reader, writer) as buffer: 388 pass 389 with self.assertRaises(ValueError) as context: 390 buffer.writable() 391 self.assertIn("closed file", str(context.exception)) 392 393 def test_writable_calls_reader_writable(self): 394 class Reader(_io.BytesIO): 395 writable = Mock(name="writable") 396 397 class Writer(_io.BytesIO): 398 writable = Mock(name="writable", return_value=True) 399 400 with Reader() as reader, Writer() as writer: 401 with _io.BufferedRWPair(reader, writer) as buffer: 402 self.assertEqual(writer.writable.call_count, 2) 403 self.assertTrue(buffer.writable()) 404 reader.writable.assert_not_called() 405 self.assertEqual(writer.writable.call_count, 3) 406 407 408class BufferedRandomTests(unittest.TestCase): 409 def test_dunder_init_with_non_seekable_raises_unsupported_operation(self): 410 class C(_io.BytesIO): 411 def seekable(self): 412 return False 413 414 c = C() 415 with self.assertRaisesRegex(_io.UnsupportedOperation, "not seekable"): 416 _io.BufferedRandom(c) 417 418 def test_dunder_init_with_non_readable_raises_unsupported_operation(self): 419 class C(_io.BytesIO): 420 def readable(self): 421 return False 422 423 c = C() 424 with self.assertRaisesRegex(_io.UnsupportedOperation, "not readable"): 425 _io.BufferedRandom(c) 426 427 def test_dunder_init_with_non_writable_raises_unsupported_operation(self): 428 class C(_io.BytesIO): 429 def writable(self): 430 return False 431 432 c = C() 433 with self.assertRaisesRegex(_io.UnsupportedOperation, "not writable"): 434 _io.BufferedRandom(c) 435 436 def test_dunder_init_with_non_positive_buffer_size_raises_value_error(self): 437 with _io.BytesIO(b"hello") as bytes_io: 438 with self.assertRaisesRegex(ValueError, "buffer size"): 439 _io.BufferedRandom(bytes_io, 0) 440 441 def test_dunder_init_allows_subclasses(self): 442 class C(_io.BufferedRandom): 443 pass 444 445 instance = C(_io.BytesIO(b"")) 446 self.assertIsInstance(instance, _io.BufferedRandom) 447 448 def test_flush_with_closed_raises_value_error(self): 449 writer = _io.BufferedRandom(_io.BytesIO(b"hello")) 450 writer.close() 451 self.assertRaisesRegex(ValueError, "flush of closed file", writer.flush) 452 453 def test_flush_writes_buffered_bytes(self): 454 with _io.BytesIO(b"Hello world!") as bytes_io: 455 with _io.BufferedRandom(bytes_io) as writer: 456 writer.write(b"foo bar") 457 self.assertEqual(bytes_io.getvalue(), b"Hello world!") 458 writer.flush() 459 self.assertEqual(bytes_io.getvalue(), b"foo barorld!") 460 461 def test_peek_returns_bytes(self): 462 with _io.BytesIO(b"hello") as bytes_io: 463 with _io.BufferedRandom(bytes_io) as buffer: 464 self.assertEqual(buffer.peek(), b"hello") 465 466 def test_peek_with_negative_returns_bytes(self): 467 with _io.BytesIO(b"hello") as bytes_io: 468 with _io.BufferedRandom(bytes_io) as buffer: 469 self.assertEqual(buffer.peek(-1), b"hello") 470 471 def test_raw_returns_raw(self): 472 with _io.BytesIO(b"hello") as bytes_io: 473 with _io.BufferedRandom(bytes_io) as buffered_io: 474 self.assertIs(buffered_io.raw, bytes_io) 475 476 def test_read_with_detached_stream_raises_value_error(self): 477 with _io.BytesIO() as bytes_io: 478 buffered = _io.BufferedRandom(bytes_io) 479 buffered.detach() 480 self.assertRaisesRegex(ValueError, "detached", buffered.read) 481 482 def test_read_with_closed_stream_raises_value_error(self): 483 bytes_io = _io.BytesIO(b"hello") 484 buffered = _io.BufferedRandom(bytes_io) 485 bytes_io.close() 486 self.assertRaisesRegex(ValueError, "closed file", buffered.read) 487 488 def test_read_with_negative_size_raises_value_error(self): 489 with _io.BytesIO() as bytes_io: 490 buffered = _io.BufferedRandom(bytes_io) 491 with self.assertRaisesRegex(ValueError, "read length"): 492 buffered.read(-2) 493 494 def test_read_with_none_size_calls_raw_readall(self): 495 class C(_io.BytesIO): 496 def readall(self): 497 raise UserWarning("foo") 498 499 with C() as bytes_io: 500 buffered = _io.BufferedRandom(bytes_io) 501 with self.assertRaises(UserWarning) as context: 502 buffered.read(None) 503 self.assertEqual(context.exception.args, ("foo",)) 504 505 def test_read_reads_bytes(self): 506 with _io.BytesIO(b"hello") as bytes_io: 507 buffered = _io.BufferedRandom(bytes_io, buffer_size=1) 508 result = buffered.read() 509 self.assertEqual(result, b"hello") 510 511 def test_read_reads_count_bytes(self): 512 with _io.BytesIO(b"hello") as bytes_io: 513 buffered = _io.BufferedRandom(bytes_io, buffer_size=1) 514 result = buffered.read(3) 515 self.assertEqual(result, b"hel") 516 517 def test_read1_with_negative_size_returns_rest_in_buffer(self): 518 with _io.BytesIO(b"hello world") as bytes_io: 519 buffered = _io.BufferedRandom(bytes_io, buffer_size=5) 520 self.assertEqual(buffered.read1(-1), b"hello") 521 522 def test_read1_calls_read(self): 523 with _io.BytesIO(b"hello") as bytes_io: 524 buffered = _io.BufferedRandom(bytes_io, buffer_size=10) 525 result = buffered.read1(3) 526 self.assertEqual(result, b"hel") 527 528 def test_read1_reads_from_buffer(self): 529 with _io.BytesIO(b"hello") as bytes_io: 530 buffered = _io.BufferedRandom(bytes_io, buffer_size=4) 531 buffered.read(1) 532 result = buffered.read1(10) 533 self.assertEqual(result, b"ell") 534 535 def test_readable_calls_raw_readable(self): 536 class C(_io.BytesIO): 537 readable = Mock(name="readable", return_value=True) 538 539 with C() as raw: 540 with _io.BufferedRandom(raw) as buffer: 541 self.assertEqual(raw.readable.call_count, 1) 542 self.assertTrue(buffer.readable()) 543 self.assertEqual(raw.readable.call_count, 2) 544 545 def test_seek_with_invalid_whence_raises_value_error(self): 546 with _io.BufferedRandom(_io.BytesIO(b"hello")) as writer: 547 with self.assertRaisesRegex(ValueError, "invalid whence"): 548 writer.seek(0, 3) 549 550 def test_seek_calls_raw_seek(self): 551 class C(_io.BytesIO): 552 seek = Mock(name="seek", return_value=42) 553 554 with C(b"hello") as bytes_io: 555 with _io.BufferedRandom(bytes_io) as writer: 556 self.assertEqual(writer.seek(10), 42) 557 bytes_io.seek.assert_called_once() 558 559 def test_seek_resets_position(self): 560 with _io.BytesIO(b"hello") as bytes_io: 561 buffered = _io.BufferedRandom(bytes_io) 562 buffered.read(2) 563 self.assertEqual(buffered.tell(), 2) 564 buffered.seek(0) 565 self.assertEqual(buffered.tell(), 0) 566 567 def test_supports_arbitrary_attributes(self): 568 with _io.BytesIO(b"hello") as bytes_io: 569 buffered = _io.BufferedRandom(bytes_io) 570 buffered.bonjour = -99 571 self.assertEqual(buffered.bonjour, -99) 572 573 def test_tell_returns_current_position(self): 574 with _io.BytesIO(b"hello") as bytes_io: 575 buffered = _io.BufferedRandom(bytes_io) 576 self.assertEqual(buffered.tell(), 0) 577 buffered.read(2) 578 self.assertEqual(buffered.tell(), 2) 579 580 def test_tell_counts_buffered_bytes(self): 581 with _io.BytesIO(b"hello") as bytes_io: 582 with _io.BufferedRandom(bytes_io) as writer: 583 self.assertEqual(bytes_io.tell(), 0) 584 self.assertEqual(writer.tell(), 0) 585 writer.write(b"123") 586 self.assertEqual(bytes_io.tell(), 0) 587 self.assertEqual(writer.tell(), 3) 588 589 def test_truncate_uses_tell_for_default_pos(self): 590 with _io.BytesIO(b"hello") as bytes_io: 591 with _io.BufferedRandom(bytes_io) as writer: 592 writer.write(b"123") 593 self.assertEqual(writer.truncate(), 3) 594 self.assertEqual(bytes_io.getvalue(), b"123") 595 596 def test_truncate_with_pos_truncates_raw(self): 597 with _io.BytesIO(b"hello") as bytes_io: 598 with _io.BufferedRandom(bytes_io) as writer: 599 writer.write(b"123") 600 self.assertEqual(writer.truncate(4), 4) 601 self.assertEqual(bytes_io.getvalue(), b"123l") 602 603 def test_writable_calls_raw_writable(self): 604 class C(_io.BytesIO): 605 writable = Mock(name="writable", return_value=True) 606 607 with C(b"hello") as bytes_io: 608 with _io.BufferedRandom(bytes_io) as writer: 609 bytes_io.writable.assert_called_once() 610 self.assertIs(writer.writable(), True) 611 self.assertEqual(bytes_io.writable.call_count, 2) 612 613 def test_write_with_closed_raises_value_error(self): 614 writer = _io.BufferedRandom(_io.BytesIO(b"hello")) 615 writer.close() 616 with self.assertRaisesRegex(ValueError, "write to closed file"): 617 writer.write(b"") 618 619 def test_write_with_str_raises_type_error(self): 620 with _io.BufferedRandom(_io.BytesIO(b"hello")) as writer: 621 with self.assertRaisesRegex(TypeError, "str"): 622 writer.write("") 623 624 625class BufferedWriterTests(unittest.TestCase): 626 def test_dunder_init_with_non_writable_stream_raises_os_error(self): 627 with _io.FileIO(_getfd(), mode="r") as file_reader: 628 with self.assertRaises(OSError) as context: 629 _io.BufferedWriter(file_reader) 630 self.assertEqual(str(context.exception), "File or stream is not writable.") 631 632 def test_dunder_init_with_non_positive_size_raises_value_error(self): 633 with _io.BytesIO(b"123") as bytes_writer: 634 with self.assertRaises(ValueError) as context: 635 _io.BufferedWriter(bytes_writer, 0) 636 self.assertEqual( 637 str(context.exception), "buffer size must be strictly positive" 638 ) 639 640 def test_dunder_init_allows_subclasses(self): 641 class C(_io.BufferedWriter): 642 pass 643 644 instance = C(_io.BytesIO(b"")) 645 self.assertIsInstance(instance, _io.BufferedWriter) 646 647 def test_flush_with_closed_raises_value_error(self): 648 writer = _io.BufferedWriter(_io.BytesIO(b"hello")) 649 writer.close() 650 with self.assertRaises(ValueError) as context: 651 writer.flush() 652 self.assertEqual(str(context.exception), "flush of closed file") 653 654 def test_flush_writes_buffered_bytes(self): 655 with _io.BytesIO(b"Hello world!") as bytes_io: 656 with _io.BufferedWriter(bytes_io) as writer: 657 writer.write(b"foo bar") 658 self.assertEqual(bytes_io.getvalue(), b"Hello world!") 659 writer.flush() 660 self.assertEqual(bytes_io.getvalue(), b"foo barorld!") 661 662 def test_raw_returns_raw(self): 663 with _io.BytesIO(b"hello") as bytes_io: 664 with _io.BufferedWriter(bytes_io) as buffered_io: 665 self.assertIs(buffered_io.raw, bytes_io) 666 667 def test_seek_with_invalid_whence_raises_value_error(self): 668 with _io.BufferedWriter(_io.BytesIO(b"hello")) as writer: 669 with self.assertRaisesRegex(ValueError, "invalid whence"): 670 writer.seek(0, 3) 671 672 def test_seek_calls_raw_seek(self): 673 class C(_io.BytesIO): 674 seek = Mock(name="seek", return_value=42) 675 676 with C(b"hello") as bytes_io: 677 with _io.BufferedWriter(bytes_io) as writer: 678 self.assertEqual(writer.seek(10), 42) 679 bytes_io.seek.assert_called_once() 680 681 def test_supports_arbitrary_attributes(self): 682 with _io.BytesIO(b"hello") as bytes_io: 683 with _io.BufferedWriter(bytes_io) as buffered_io: 684 buffered_io.guten_tag = 5.5 685 self.assertEqual(buffered_io.guten_tag, 5.5) 686 687 def test_tell_counts_buffered_bytes(self): 688 with _io.BytesIO(b"hello") as bytes_io: 689 with _io.BufferedWriter(bytes_io) as writer: 690 self.assertEqual(bytes_io.tell(), 0) 691 self.assertEqual(writer.tell(), 0) 692 writer.write(b"123") 693 self.assertEqual(bytes_io.tell(), 0) 694 self.assertEqual(writer.tell(), 3) 695 696 def test_truncate_uses_tell_for_default_pos(self): 697 with _io.BytesIO(b"hello") as bytes_io: 698 with _io.BufferedWriter(bytes_io) as writer: 699 writer.write(b"123") 700 self.assertEqual(writer.truncate(), 3) 701 self.assertEqual(bytes_io.getvalue(), b"123") 702 703 def test_truncate_with_pos_truncates_raw(self): 704 with _io.BytesIO(b"hello") as bytes_io: 705 with _io.BufferedWriter(bytes_io) as writer: 706 writer.write(b"123") 707 self.assertEqual(writer.truncate(4), 4) 708 self.assertEqual(bytes_io.getvalue(), b"123l") 709 710 def test_writable_calls_raw_writable(self): 711 class C(_io.BytesIO): 712 writable = Mock(name="writable", return_value=True) 713 714 with C(b"hello") as bytes_io: 715 with _io.BufferedWriter(bytes_io) as writer: 716 bytes_io.writable.assert_called_once() 717 self.assertIs(writer.writable(), True) 718 self.assertEqual(bytes_io.writable.call_count, 2) 719 720 def test_write_with_closed_raises_value_error(self): 721 writer = _io.BufferedWriter(_io.BytesIO(b"hello")) 722 writer.close() 723 with self.assertRaises(ValueError) as context: 724 writer.write(b"") 725 self.assertEqual(str(context.exception), "write to closed file") 726 727 def test_write_with_str_raises_type_error(self): 728 with _io.BufferedWriter(_io.BytesIO(b"hello")) as writer: 729 with self.assertRaisesRegex(TypeError, "str"): 730 writer.write("") 731 732 733class BytesIOTests(unittest.TestCase): 734 def test_dunder_init_with_none_initial_value_sets_empty_string(self): 735 with _io.BytesIO() as f: 736 self.assertEqual(f.getvalue(), b"") 737 738 def test_dunder_init_with_non_bytes_raises_type_error(self): 739 self.assertRaisesRegex( 740 TypeError, 741 "a bytes-like object is required, not 'str'", 742 _io.BytesIO, 743 "not_bytes", 744 ) 745 746 def test_dunder_init_with_bytes_returns_bytesio_instance(self): 747 f = _io.BytesIO(b"foo") 748 self.assertIsInstance(f, _io.BytesIO) 749 self.assertEqual(f.getvalue(), b"foo") 750 f.close() 751 752 def test_dunder_init_with_bytes_subclass_returns_bytesio_instance(self): 753 class C(bytes): 754 pass 755 756 instance = C(b"hello") 757 f = _io.BytesIO(instance) 758 self.assertIsInstance(f, _io.BytesIO) 759 self.assertEqual(f.getvalue(), b"hello") 760 761 def test_dunder_init_with_bytearray_subclass_returns_bytesio_instance(self): 762 class C(bytearray): 763 pass 764 765 instance = C(bytearray(b"hello")) 766 f = _io.BytesIO(instance) 767 self.assertIsInstance(f, _io.BytesIO) 768 self.assertEqual(f.getvalue(), b"hello") 769 770 def test_dunder_init_with_bytearray_returns_bytesio_instance(self): 771 bytes_array = bytearray(b"hello") 772 f = _io.BytesIO(bytes_array) 773 self.assertIsInstance(f, _io.BytesIO) 774 self.assertEqual(f.getvalue(), b"hello") 775 776 def test_dunder_init_with_memoryview_of_bytes_returns_bytesio_instance(self): 777 mem = memoryview(b"hello") 778 f = _io.BytesIO(mem) 779 self.assertIsInstance(f, _io.BytesIO) 780 self.assertEqual(f.getvalue(), b"hello") 781 782 def test_dunder_init_with_memoryview_of_bytearray_returns_bytesio_instance(self): 783 bytes_array = bytearray(b"hello") 784 mem = memoryview(bytes_array) 785 f = _io.BytesIO(mem) 786 self.assertIsInstance(f, _io.BytesIO) 787 self.assertEqual(f.getvalue(), b"hello") 788 789 def test_dunder_init_with_none_returns_empty_bytesio_instance(self): 790 f = _io.BytesIO(None) 791 self.assertEqual(f.getvalue(), b"") 792 793 def test_dunder_init_with_bytes_sets_closed_false(self): 794 f = _io.BytesIO(b"hello") 795 self.assertFalse(f.closed) 796 self.assertTrue(f.readable()) 797 self.assertTrue(f.writable()) 798 self.assertTrue(f.seekable()) 799 800 def test_dunder_init_with_float_raises_type_error(self): 801 with self.assertRaises(TypeError): 802 f = _io.BytesIO(0.5) 803 f.close() 804 805 def test_close_with_non_BytesIO_self_raises_type_error(self): 806 with self.assertRaises(TypeError): 807 _io.BytesIO.close(5) 808 809 def test_close_clears_buffer(self): 810 f = _io.BytesIO(b"foo") 811 f.close() 812 with self.assertRaises(ValueError): 813 f.getvalue() 814 815 def test_close_with_closed_instance_does_not_raise_value_error(self): 816 f = _io.BytesIO(b"foo") 817 f.close() 818 f.close() 819 820 def test_close_with_subclass_overwrite_uses_subclass_property(self): 821 class C(_io.BytesIO): 822 @property 823 def closed(self): 824 return 2 825 826 f = C(b"") 827 self.assertEqual(f.closed, 2) 828 829 def test_getbuffer_with_non_BytesIO_self_raises_type_error(self): 830 with self.assertRaises(TypeError): 831 _io.BytesIO.getbuffer() 832 833 def test_getbuffer_with_closed_raises_value_error(self): 834 f = _io.BytesIO(b"foo") 835 f.close() 836 with self.assertRaises(ValueError): 837 f.getbuffer() 838 839 def test_getvalue_with_non_BytesIO_raises_type_error(self): 840 with self.assertRaises(TypeError): 841 _io.BytesIO.getvalue(5) 842 843 def test_getvalue_with_bytes_returns_bytes_of_buffer(self): 844 f = _io.BytesIO(b"foo") 845 result = f.getvalue() 846 self.assertIsInstance(result, bytes) 847 self.assertEqual(result, b"foo") 848 849 def test_getvalue_with_bytearray_returns_bytes_of_buffer(self): 850 barr = bytearray(b"foo") 851 f = _io.BytesIO(barr) 852 result = f.getvalue() 853 self.assertIsInstance(result, bytes) 854 self.assertEqual(result, b"foo") 855 856 def test_getvalue_with_memoryview_of_bytes_returns_bytes_of_buffer(self): 857 view = memoryview(b"foo") 858 f = _io.BytesIO(view) 859 result = f.getvalue() 860 self.assertIsInstance(result, bytes) 861 self.assertEqual(result, b"foo") 862 863 def test_getvalue_with_memoryview_of_pointer_returns_bytes_of_buffer(self): 864 mm = mmap.mmap(-1, 4) 865 view = memoryview(mm) 866 f = _io.BytesIO(view) 867 result = f.getvalue() 868 self.assertIsInstance(result, bytes) 869 self.assertEqual(result, b"\x00\x00\x00\x00") 870 871 def test_getvalue_with_modification_on_returned_bytes_returns_same_result(self): 872 f = _io.BytesIO(b"foo") 873 result = f.getvalue() 874 self.assertEqual(result, b"foo") 875 result += b"o" 876 self.assertEqual(result, b"fooo") 877 self.assertEqual(f.getvalue(), b"foo") 878 879 def test_getvalue_with_open_returns_copy_of_value(self): 880 f = _io.BytesIO(b"foobarbaz") 881 first_value = f.getvalue() 882 f.write(b"temp") 883 second_value = f.getvalue() 884 self.assertEqual(first_value, b"foobarbaz") 885 self.assertEqual(second_value, b"temparbaz") 886 887 def test_getvalue_with_closed_raises_value_error(self): 888 f = _io.BytesIO(b"hello") 889 f.close() 890 with self.assertRaises(ValueError): 891 f.getvalue() 892 893 def test_getbuffer_with_bytes_returns_bytes_of_buffer(self): 894 f = _io.BytesIO(b"foo") 895 result = f.getbuffer() 896 self.assertIsInstance(result, memoryview) 897 898 def test_read_with_non_BytesIO_self_raises_type_error(self): 899 with self.assertRaises(TypeError): 900 _io.BytesIO.read(5) 901 902 def test_read_one_byte_reads_bytes(self): 903 f = _io.BytesIO(b"foo") 904 self.assertEqual(f.read(1), b"f") 905 self.assertEqual(f.read(1), b"o") 906 self.assertEqual(f.read(1), b"o") 907 self.assertEqual(f.read(1), b"") 908 f.close() 909 910 def test_read_with_bytes_reads_bytes(self): 911 f = _io.BytesIO(b"foo") 912 self.assertEqual(f.read(), b"foo") 913 self.assertEqual(f.read(), b"") 914 f.close() 915 916 def test_read_with_none_size_returns_rest(self): 917 f = _io.BytesIO(b"foo") 918 self.assertEqual(f.read(1), b"f") 919 self.assertEqual(f.read(None), b"oo") 920 921 def test_read_with_closed_raises_value_error(self): 922 f = _io.BytesIO(b"hello") 923 f.close() 924 with self.assertRaises(ValueError): 925 f.read() 926 927 def test_read_with_int_subclass_reads(self): 928 class C(int): 929 pass 930 931 f = _io.BytesIO(b"hello") 932 c = C(3) 933 self.assertEqual(f.read(c), b"hel") 934 935 def test_read_with_int_subclass_does_not_call_dunder_index(self): 936 class C(int): 937 def __index__(self): 938 raise UserWarning("foo") 939 940 f = _io.BytesIO(b"hello") 941 c = C() 942 f.read(c) 943 944 def test_read_with_non_int_raises_type_error(self): 945 f = _io.BytesIO(b"hello") 946 with self.assertRaises(TypeError): 947 f.read(2.5) 948 949 def test_read1_with_non_BytesIO_self_raises_type_error(self): 950 with self.assertRaises(TypeError): 951 _io.BytesIO.read1(5) 952 953 def test_read1_wtih_bytes_reads_bytes(self): 954 f = _io.BytesIO(b"foo") 955 self.assertEqual(f.read1(3), b"foo") 956 self.assertEqual(f.read1(3), b"") 957 f.close() 958 959 def test_read1_with_none_size_returns_rest(self): 960 f = _io.BytesIO(b"foo") 961 self.assertEqual(f.read1(1), b"f") 962 self.assertEqual(f.read1(None), b"oo") 963 964 def test_read1_with_size_not_specified_returns_rest(self): 965 f = _io.BytesIO(b"foo") 966 self.assertEqual(f.read1(1), b"f") 967 self.assertEqual(f.read1(), b"oo") 968 f.close() 969 970 # TODO(T47880928): Test BytesIO.readinto 971 # TODO(T47880928): Test BytesIO.readinto1 972 973 def test_readline_reads_one_line(self): 974 f = _io.BytesIO(b"hello\nworld\nfoo\nbar") 975 self.assertEqual(f.readline(), b"hello\n") 976 self.assertEqual(f.readline(), b"world\n") 977 self.assertEqual(f.readline(), b"foo\n") 978 self.assertEqual(f.readline(), b"bar") 979 self.assertEqual(f.readline(), b"") 980 f.close() 981 982 def test_readlines_reads_all_lines(self): 983 f = _io.BytesIO(b"hello\nworld\nfoo\nbar") 984 self.assertEqual(f.readlines(), [b"hello\n", b"world\n", b"foo\n", b"bar"]) 985 self.assertEqual(f.read(), b"") 986 f.close() 987 988 def test_seek_with_non_BytesIO_self_raises_type_error(self): 989 with self.assertRaises(TypeError): 990 _io.BytesIO.seek(5, 5) 991 992 def test_seek_with_closed_raises_value_error(self): 993 f = _io.BytesIO(b"hello") 994 f.close() 995 with self.assertRaises(ValueError): 996 f.seek(3) 997 998 def test_seek_with_none_offset_raises_type_error(self): 999 f = _io.BytesIO(b"hello") 1000 with self.assertRaises(TypeError): 1001 f.seek(None, 0) 1002 1003 def test_seek_with_none_whence_raises_type_error(self): 1004 f = _io.BytesIO(b"hello") 1005 with self.assertRaises(TypeError): 1006 f.seek(1, None) 1007 1008 def test_seek_with_int_subclass_offset_does_not_call_dunder_index(self): 1009 class C(int): 1010 def __index__(self): 1011 raise UserWarning("foo") 1012 1013 f = _io.BytesIO(b"") 1014 pos = C() 1015 f.seek(pos) 1016 1017 def test_seek_with_class_offset_calls_dunder_index(self): 1018 class C: 1019 def __index__(self): 1020 raise UserWarning("foo") 1021 1022 f = _io.BytesIO(b"") 1023 pos = C() 1024 with self.assertRaises(UserWarning): 1025 f.seek(pos) 1026 1027 def test_seek_with_class_whence_calls_dunder_index(self): 1028 class C: 1029 def __index__(self): 1030 raise UserWarning("foo") 1031 1032 f = _io.BytesIO(b"") 1033 whence = C() 1034 with self.assertRaises(UserWarning): 1035 f.seek(0, whence) 1036 1037 def test_seek_with_float_offset_raises_type_error(self): 1038 f = _io.BytesIO(b"") 1039 pos = 3.4 1040 with self.assertRaises(TypeError): 1041 f.seek(pos) 1042 1043 def test_seek_with_float_whence_raises_type_error(self): 1044 f = _io.BytesIO(b"hello") 1045 with self.assertRaises(TypeError): 1046 f.seek(0.5) 1047 1048 def test_seek_with_int_subclass_whence_does_not_call_dunder_index(self): 1049 class C(int): 1050 def __index__(self): 1051 raise UserWarning("foo") 1052 1053 f = _io.BytesIO(b"") 1054 whence = C() 1055 f.seek(5, whence) 1056 1057 def test_seek_with_whence_0_returns_new_pos(self): 1058 f = _io.BytesIO(b"") 1059 pos = 3 1060 self.assertEqual(f.seek(pos), 3) 1061 self.assertEqual(f.tell(), 3) 1062 1063 def test_seek_with_whence_1_returns_new_pos(self): 1064 f = _io.BytesIO(b"") 1065 f.seek(3) 1066 offset = 3 1067 self.assertEqual(f.seek(offset, 1), 6) 1068 self.assertEqual(f.tell(), 6) 1069 1070 def test_seek_with_whence_1_negative_new_pos_retuns_zero(self): 1071 f = _io.BytesIO(b"") 1072 f.seek(3) 1073 offset = -5 1074 self.assertEqual(f.seek(offset, 1), 0) 1075 self.assertEqual(f.tell(), 0) 1076 1077 def test_seek_with_whence_2_returns_new_pos(self): 1078 f = _io.BytesIO(b"hello") 1079 offset = 3 1080 self.assertEqual(f.seek(offset, 2), 8) 1081 self.assertEqual(f.tell(), 8) 1082 1083 def test_seek_with_whence_2_negative_new_pos_retuns_zero(self): 1084 f = _io.BytesIO(b"hello") 1085 offset = -7 1086 self.assertEqual(f.seek(offset, 2), 0) 1087 self.assertEqual(f.tell(), 0) 1088 1089 def test_seek_with_whence_3_raises_value_error(self): 1090 f = _io.BytesIO(b"hello") 1091 with self.assertRaises(ValueError): 1092 f.seek(3, 3) 1093 1094 def test_seek_with_large_whence_raises_overflow_error(self): 1095 f = _io.BytesIO(b"hello") 1096 with self.assertRaises(OverflowError): 1097 f.seek(1, 2 ** 65) 1098 1099 def test_tell_with_non_BytesIO_self_raises_type_error(self): 1100 with self.assertRaises(TypeError): 1101 _io.BytesIO.tell(5) 1102 1103 def test_tell_with_closed_file_raises_value_error(self): 1104 f = _io.BytesIO(b"") 1105 f.close() 1106 with self.assertRaises(ValueError): 1107 f.tell() 1108 1109 def test_tell_returns_position(self): 1110 f = _io.BytesIO(b"foo") 1111 self.assertEqual(f.tell(), 0) 1112 f.read(1) 1113 self.assertEqual(f.tell(), 1) 1114 1115 def test_truncate_with_non_BytesIO_self_raises_type_error(self): 1116 with self.assertRaises(TypeError): 1117 _io.BytesIO.truncate(5, 5) 1118 1119 def test_truncate_with_pos_larger_than_length_returns_pos(self): 1120 f = _io.BytesIO(b"hello") 1121 self.assertEqual(f.truncate(6), 6) 1122 self.assertEqual(f.getvalue(), b"hello") 1123 1124 def test_truncate_with_pos_smaller_than_length_truncates(self): 1125 f = _io.BytesIO(b"hello") 1126 self.assertEqual(f.truncate(3), 3) 1127 self.assertEqual(f.getvalue(), b"hel") 1128 1129 def test_truncate_with_int_subclass_does_not_call_dunder_int(self): 1130 class C(int): 1131 def __int__(self): 1132 raise UserWarning("foo") 1133 1134 f = _io.BytesIO(b"") 1135 pos = C() 1136 f.truncate(pos) 1137 1138 def test_truncate_with_non_int_raises_type_error(self): 1139 dunder_int_called = False 1140 1141 class C: 1142 def __int__(self): 1143 nonlocal dunder_int_called 1144 dunder_int_called = True 1145 raise UserWarning("foo") 1146 1147 f = _io.BytesIO(b"") 1148 pos = C() 1149 with self.assertRaises(TypeError): 1150 f.truncate(pos) 1151 self.assertFalse(dunder_int_called) 1152 1153 def test_truncate_with_raising_dunder_int_raises_type_error(self): 1154 class C: 1155 def __int__(self): 1156 raise UserWarning("foo") 1157 1158 f = _io.BytesIO(b"") 1159 pos = C() 1160 with self.assertRaises(TypeError): 1161 f.truncate(pos) 1162 1163 def test_truncate_with_float_raises_type_error(self): 1164 f = _io.BytesIO(b"") 1165 pos = 3.4 1166 with self.assertRaises(TypeError): 1167 f.truncate(pos) 1168 1169 def test_write_with_non_BytesIO_raises_type_error(self): 1170 with self.assertRaises(TypeError): 1171 _io.BytesIO.write(5, b"foo") 1172 1173 def test_write_with_array_returns_length(self): 1174 bytes_io = _io.BytesIO(b"") 1175 arr = array.array("b", [1, 2, 3]) 1176 self.assertEqual(bytes_io.write(arr), 3) 1177 self.assertEqual(bytes_io.getvalue(), b"\x01\x02\x03") 1178 1179 def test_write_with_bytearray_returns_length(self): 1180 bytes_io = _io.BytesIO(b"hello") 1181 bytes_array = bytearray(b"bArr") 1182 self.assertEqual(bytes_io.write(bytes_array), 4) 1183 self.assertEqual(bytes_io.getvalue(), b"bArro") 1184 1185 def test_write_with_bytearray_subclass_returns_length(self): 1186 class C(bytearray): 1187 pass 1188 1189 bytes_io = _io.BytesIO(b"hello") 1190 instance = C(b"bArr") 1191 self.assertEqual(bytes_io.write(instance), 4) 1192 self.assertEqual(bytes_io.getvalue(), b"bArro") 1193 1194 def test_write_with_bytes_returns_length(self): 1195 bytes_io = _io.BytesIO(b"hello") 1196 self.assertEqual(bytes_io.write(b"bytes"), 5) 1197 self.assertEqual(bytes_io.getvalue(), b"bytes") 1198 1199 def test_write_with_bytes_subclass_returns_length(self): 1200 class C(bytes): 1201 pass 1202 1203 bytes_io = _io.BytesIO(b"hello") 1204 instance = C(b"bytes") 1205 self.assertEqual(bytes_io.write(instance), 5) 1206 self.assertEqual(bytes_io.getvalue(), b"bytes") 1207 1208 def test_write_with_memoryview_of_bytearray_returns_length(self): 1209 bytes_io = _io.BytesIO(b"hello") 1210 bytes_array = bytearray(b"bArr") 1211 view = memoryview(bytes_array) 1212 self.assertEqual(bytes_io.write(view), 4) 1213 self.assertEqual(bytes_io.getvalue(), b"bArro") 1214 1215 def test_write_with_memoryview_of_bytes_returns_length(self): 1216 bytes_io = _io.BytesIO(b"hello") 1217 view = memoryview(b"view") 1218 self.assertEqual(bytes_io.write(view), 4) 1219 self.assertEqual(bytes_io.getvalue(), b"viewo") 1220 1221 def test_write_with_memoryview_of_pointer_returns_length(self): 1222 bytes_io = _io.BytesIO(b"") 1223 mm = mmap.mmap(-1, 4) 1224 view = memoryview(mm) 1225 self.assertEqual(bytes_io.write(view), 4) 1226 self.assertEqual(bytes_io.getvalue(), b"\x00\x00\x00\x00") 1227 1228 def test_write_with_non_bytes_like_raises_type_error(self): 1229 bytes_io = _io.BytesIO(b"hello") 1230 with self.assertRaises(TypeError) as context: 1231 bytes_io.write(123) 1232 self.assertRegex( 1233 str(context.exception), "a bytes-like object is required, not 'int'" 1234 ) 1235 with self.assertRaises(TypeError) as context: 1236 bytes_io.write("str") 1237 self.assertRegex( 1238 str(context.exception), "a bytes-like object is required, not 'str'" 1239 ) 1240 1241 def test_write_with_seek_writes_from_current_position(self): 1242 bytes_io = _io.BytesIO(b"hello") 1243 self.assertEqual(bytes_io.seek(1), 1) 1244 self.assertEqual(bytes_io.write(b"1"), 1) 1245 self.assertEqual(bytes_io.getvalue(), b"h1llo") 1246 1247 def test_write_with_seek_past_end_pads_with_zero(self): 1248 bytes_io = _io.BytesIO(b"hello") 1249 self.assertEqual(bytes_io.seek(7), 7) 1250 self.assertEqual(bytes_io.write(b"world"), 5) 1251 self.assertEqual(bytes_io.getvalue(), b"hello\x00\x00world") 1252 1253 def test_write_with_closed_raises_value_error(self): 1254 bytes_io = _io.BytesIO(b"hello") 1255 bytes_io.close() 1256 with self.assertRaises(ValueError) as context: 1257 bytes_io.write("close") 1258 self.assertRegex(str(context.exception), "I/O operation on closed file") 1259 1260 def test_write_with_empty_does_nothing(self): 1261 bytes_io = _io.BytesIO(b"hello") 1262 self.assertEqual(bytes_io.write(b""), 0) 1263 self.assertEqual(bytes_io.getvalue(), b"hello") 1264 1265 def test_write_with_larger_bytes_extends_buffer(self): 1266 bytes_io = _io.BytesIO(b"hello") 1267 self.assertEqual(bytes_io.write(b"new world"), 9) 1268 self.assertEqual(bytes_io.getvalue(), b"new world") 1269 1270 1271class FileIOTests(unittest.TestCase): 1272 def test_close_closes_file(self): 1273 f = _io.FileIO(_getfd(), closefd=True) 1274 self.assertFalse(f.closed) 1275 f.close() 1276 self.assertTrue(f.closed) 1277 1278 def test_close_twice_does_not_raise(self): 1279 f = _io.FileIO(_getfd(), closefd=True) 1280 f.close() 1281 f.close() 1282 1283 def test_closefd_with_closefd_set_returns_true(self): 1284 with _io.FileIO(_getfd(), closefd=True) as f: 1285 self.assertTrue(f.closefd) 1286 1287 def test_closefd_with_closefd_not_set_returns_true(self): 1288 with _io.FileIO(_getfd(), closefd=False) as f: 1289 self.assertFalse(f.closefd) 1290 1291 def test_dunder_init_with_float_file_raises_type_error(self): 1292 with self.assertRaises(TypeError) as context: 1293 _io.FileIO(3.14) 1294 1295 self.assertEqual(str(context.exception), "integer argument expected, got float") 1296 1297 def test_dunder_init_with_negative_file_descriptor_raises_value_error(self): 1298 with self.assertRaises(ValueError) as context: 1299 _io.FileIO(-5) 1300 1301 self.assertEqual(str(context.exception), "negative file descriptor") 1302 1303 def test_dunder_init_with_non_str_mode_raises_type_error(self): 1304 with self.assertRaises(TypeError): 1305 _io.FileIO(_getfd(), mode=3.14) 1306 1307 def test_dunder_init_with_no_mode_makes_object_readable(self): 1308 with _io.FileIO(_getfd()) as f: 1309 self.assertTrue(f.readable()) 1310 self.assertFalse(f.writable()) 1311 self.assertFalse(f.seekable()) 1312 1313 def test_dunder_init_with_r_in_mode_makes_object_readable(self): 1314 with _io.FileIO(_getfd(), mode="r") as f: 1315 self.assertTrue(f.readable()) 1316 self.assertFalse(f.writable()) 1317 1318 def test_dunder_init_with_w_in_mode_makes_object_writable(self): 1319 with _io.FileIO(_getfd(), mode="w") as f: 1320 self.assertFalse(f.readable()) 1321 self.assertTrue(f.writable()) 1322 1323 def test_dunder_init_with_r_plus_in_mode_makes_object_readable_and_writable(self): 1324 with _io.FileIO(_getfd(), mode="r+") as f: 1325 self.assertTrue(f.readable()) 1326 self.assertTrue(f.writable()) 1327 1328 def test_dunder_init_with_w_plus_in_mode_makes_object_readable_and_writable(self): 1329 with _io.FileIO(_getfd(), mode="w+") as f: 1330 self.assertTrue(f.readable()) 1331 self.assertTrue(f.writable()) 1332 1333 def test_dunder_init_with_only_plus_in_mode_raises_value_error(self): 1334 with self.assertRaises(ValueError) as context: 1335 _io.FileIO(_getfd(), mode="+") 1336 1337 self.assertEqual( 1338 str(context.exception), 1339 "Must have exactly one of create/read/write/append mode and at " 1340 "most one plus", 1341 ) 1342 1343 def test_dunder_init_with_more_than_one_plus_in_mode_raises_value_error(self): 1344 with self.assertRaises(ValueError) as context: 1345 _io.FileIO(_getfd(), mode="r++") 1346 1347 self.assertEqual( 1348 str(context.exception), 1349 "Must have exactly one of create/read/write/append mode and at " 1350 "most one plus", 1351 ) 1352 1353 def test_dunder_init_with_rw_in_mode_raises_value_error(self): 1354 with self.assertRaises(ValueError) as context: 1355 _io.FileIO(_getfd(), mode="rw") 1356 1357 self.assertEqual( 1358 str(context.exception), 1359 "Must have exactly one of create/read/write/append mode and at " 1360 "most one plus", 1361 ) 1362 1363 def test_dunder_init_with_filename_and_closefd_equals_false_raises_value_error( 1364 self, 1365 ): 1366 with self.assertRaises(ValueError) as context: 1367 _io.FileIO("foobar", closefd=False) 1368 1369 self.assertEqual( 1370 str(context.exception), "Cannot use closefd=False with file name" 1371 ) 1372 1373 def test_dunder_init_with_filename_and_opener_calls_opener(self): 1374 def opener(fn, flags): 1375 raise UserWarning("foo") 1376 1377 with self.assertRaises(UserWarning): 1378 _io.FileIO("foobar", opener=opener) 1379 1380 def test_dunder_init_with_opener_returning_non_int_raises_type_error(self): 1381 def opener(fn, flags): 1382 return "not_an_int" 1383 1384 with self.assertRaises(TypeError) as context: 1385 _io.FileIO("foobar", opener=opener) 1386 1387 self.assertEqual(str(context.exception), "expected integer from opener") 1388 1389 def test_dunder_init_with_opener_returning_negative_int_raises_value_error(self): 1390 def opener(fn, flags): 1391 return -1 1392 1393 with self.assertRaises(ValueError) as context: 1394 _io.FileIO("foobar", opener=opener) 1395 1396 self.assertEqual(str(context.exception), "opener returned -1") 1397 1398 # TODO(emacs): Enable side-by-side against CPython when issue38031 is fixed 1399 @pyro_only 1400 def test_dunder_init_with_opener_returning_bad_fd_raises_os_error(self): 1401 with self.assertRaises(OSError): 1402 _io.FileIO("foobar", opener=lambda name, flags: 1000000) 1403 1404 def test_dunder_init_with_directory_raises_is_a_directory_error(self): 1405 with self.assertRaises(IsADirectoryError) as context: 1406 _io.FileIO("/") 1407 1408 self.assertEqual(context.exception.args, (21, "Is a directory")) 1409 1410 def test_dunder_repr_with_closed_file_returns_closed_in_result(self): 1411 f = _io.FileIO(_getfd(), mode="r") 1412 f.close() 1413 self.assertEqual(f.__repr__(), "<_io.FileIO [closed]>") 1414 1415 def test_dunder_repr_with_closefd_and_fd_returns_fd_in_result(self): 1416 fd = _getfd() 1417 with _io.FileIO(fd, mode="r", closefd=True) as f: 1418 self.assertEqual( 1419 f.__repr__(), f"<_io.FileIO name={fd} mode='rb' closefd=True>" 1420 ) 1421 1422 def test_dunder_repr_without_closefd_and_fd_returns_fd_in_result(self): 1423 fd = _getfd() 1424 with _io.FileIO(fd, mode="r", closefd=False) as f: 1425 self.assertEqual( 1426 f.__repr__(), f"<_io.FileIO name={fd} mode='rb' closefd=False>" 1427 ) 1428 1429 def test_dunder_repr_with_closefd_and_name_returns_name_in_result(self): 1430 def opener(fn, flags): 1431 return os.open(fn, os.O_CREAT | os.O_WRONLY, 0o666) 1432 1433 with tempfile.TemporaryDirectory() as tempdir: 1434 with _io.FileIO(f"{tempdir}/foo", mode="r", opener=opener) as f: 1435 self.assertEqual( 1436 f.__repr__(), 1437 f"<_io.FileIO name='{tempdir}/foo' mode='rb' closefd=True>", 1438 ) 1439 1440 def test_fileno_with_closed_file_raises_value_error(self): 1441 f = _io.FileIO(_getfd(), mode="r") 1442 f.close() 1443 self.assertRaises(ValueError, f.fileno) 1444 1445 def test_fileno_returns_int(self): 1446 with _io.FileIO(_getfd(), mode="r") as f: 1447 self.assertIsInstance(f.fileno(), int) 1448 1449 def test_isatty_with_closed_file_raises_value_error(self): 1450 f = _io.FileIO(_getfd(), mode="r") 1451 f.close() 1452 self.assertRaises(ValueError, f.isatty) 1453 1454 def test_isatty_with_non_tty_returns_false(self): 1455 with _io.FileIO(_getfd(), mode="r") as f: 1456 self.assertFalse(f.isatty()) 1457 1458 def test_mode_with_created_and_readable_returns_xb_plus(self): 1459 with _io.FileIO(_getfd(), mode="x+") as f: 1460 self.assertEqual(f.mode, "xb+") 1461 1462 def test_mode_with_created_and_non_readable_returns_xb(self): 1463 with _io.FileIO(_getfd(), mode="x") as f: 1464 self.assertEqual(f.mode, "xb") 1465 1466 # Testing appendable/readable is really tricky. 1467 1468 def test_mode_with_readable_and_writable_returns_rb_plus(self): 1469 with _io.FileIO(_getfd(), mode="r+") as f: 1470 self.assertEqual(f.mode, "rb+") 1471 1472 def test_mode_with_readable_and_non_writable_returns_rb(self): 1473 with _io.FileIO(_getfd(), mode="r") as f: 1474 self.assertEqual(f.mode, "rb") 1475 1476 def test_mode_with_writable_returns_wb(self): 1477 with _io.FileIO(_getfd(), mode="w") as f: 1478 self.assertEqual(f.mode, "wb") 1479 1480 def test_read_on_closed_file_raises_value_error(self): 1481 f = _io.FileIO(_getfd(), mode="r") 1482 f.close() 1483 self.assertRaises(ValueError, f.read) 1484 1485 def test_read_on_non_readable_file_raises_unsupported_operation(self): 1486 with _io.FileIO(_getfd(), mode="w") as f: 1487 self.assertRaises(_io.UnsupportedOperation, f.read) 1488 1489 def test_read_returns_bytes(self): 1490 with _io.FileIO(_getfd(), mode="r") as f: 1491 self.assertIsInstance(f.read(), bytes) 1492 1493 def test_readable_with_closed_file_raises_value_error(self): 1494 f = _io.FileIO(_getfd(), mode="r") 1495 f.close() 1496 self.assertRaises(ValueError, f.readable) 1497 1498 def test_readable_with_readable_file_returns_true(self): 1499 with _io.FileIO(_getfd(), mode="r") as f: 1500 self.assertTrue(f.readable()) 1501 1502 def test_readable_with_non_readable_file_returns_false(self): 1503 with _io.FileIO(_getfd(), mode="w") as f: 1504 self.assertFalse(f.readable()) 1505 1506 def test_readall_with_non_FileIO_self_raises_type_error(self): 1507 with self.assertRaises(TypeError): 1508 _io.FileIO.readall(5) 1509 1510 def test_readall_on_closed_file_raises_value_error(self): 1511 f = _io.FileIO(_getfd(), mode="r") 1512 f.close() 1513 self.assertRaises(ValueError, f.readall) 1514 1515 # TODO(T70611902): Modify the readall and test after CPython fixed this behavior 1516 def test_readall_on_non_readable_file_returns_full_bytes(self): 1517 r, w = os.pipe() 1518 w = os.fdopen(w, "w") 1519 w.write("hello") 1520 w.close() 1521 with _io.FileIO(r, mode="w") as f: 1522 self.assertFalse(f.readable()) 1523 self.assertEqual(f.readall(), b"hello") 1524 1525 def test_readall_returns_bytes(self): 1526 with _io.FileIO(_getfd(), mode="r") as f: 1527 self.assertIsInstance(f.readall(), bytes) 1528 1529 def test_readall_returns_all_bytes(self): 1530 r, w = os.pipe() 1531 w = os.fdopen(w, "w") 1532 w.write("hello") 1533 w.close() 1534 with _io.FileIO(r, mode="r") as f: 1535 result = f.readall() 1536 self.assertIsInstance(result, bytes) 1537 self.assertEqual(result, b"hello") 1538 1539 @unittest.skipIf(sys.platform != "linux", "needs /proc filesystem") 1540 def test_readall_on_unseekable_file_reads_all(self): 1541 with open("/proc/self/statm", "rb", buffering=False) as f: 1542 result = f.readall() 1543 self.assertGreater(len(result), 1) 1544 1545 def test_readall_with_more_than_default_buffer_size_returns_all_bytes(self): 1546 r, w = os.pipe() 1547 w = os.fdopen(w, "w") 1548 w.write("foo" * 1000) 1549 w.close() 1550 with _io.FileIO(r, mode="r") as f: 1551 result = f.readall() 1552 self.assertIsInstance(result, bytes) 1553 self.assertEqual(result, b"foo" * 1000) 1554 1555 def test_readall_with_fifo_file_returns_all_bytes(self): 1556 with tempfile.TemporaryDirectory() as tempdir: 1557 fifo = f"{tempdir}/fifo" 1558 os.mkfifo(fifo) 1559 exe = sys.executable 1560 src = f""" 1561import time 1562with open('{fifo}', 'wb') as f: 1563 f.write(b'hello...') 1564 f.flush() 1565 time.sleep(0.5) 1566 f.write(b'bye') 1567""" 1568 p = subprocess.Popen([exe, "-c", src]) 1569 with open(fifo, "rb", buffering=False) as f: 1570 result = f.readall() 1571 self.assertEqual(result, b"hello...bye") 1572 p.wait() 1573 1574 # TODO(T53182806): Test FileIO.readinto once memoryview.__setitem__ is done 1575 def test_readinto_with_non_FileIO_self_raises_type_error(self): 1576 with self.assertRaises(TypeError): 1577 _io.FileIO.readinto(5, bytearray(b"hello")) 1578 1579 def test_readinto_with_array_reads_bytes_into_array(self): 1580 r, w = os.pipe() 1581 w = os.fdopen(w, "w") 1582 w.write("hello") 1583 w.close() 1584 arr = array.array("b", [1, 2, 3]) 1585 with _io.FileIO(r, mode="r") as f: 1586 self.assertEqual(f.readinto(arr), 3) 1587 self.assertEqual(bytes(arr), b"hel") 1588 1589 def test_readinto_with_bytearray_reads_bytes_into_bytearray(self): 1590 r, w = os.pipe() 1591 w = os.fdopen(w, "w") 1592 w.write("hello") 1593 w.close() 1594 barr = bytearray(b"abc") 1595 with _io.FileIO(r, mode="r") as f: 1596 self.assertEqual(f.readinto(barr), 3) 1597 self.assertEqual(barr, b"hel") 1598 1599 def test_readinto_with_bytes_raises_type_error(self): 1600 r, w = os.pipe() 1601 w = os.fdopen(w, "w") 1602 w.write("hello") 1603 w.close() 1604 b = b"abc" 1605 with _io.FileIO(r, mode="r") as f: 1606 with self.assertRaises(TypeError): 1607 f.readinto(b) 1608 1609 def test_readinto_with_memoryview_of_bytes_raises_type_error(self): 1610 r, w = os.pipe() 1611 w = os.fdopen(w, "w") 1612 w.write("hello") 1613 w.close() 1614 view = memoryview(b"abc") 1615 with _io.FileIO(r, mode="r") as f: 1616 with self.assertRaises(TypeError): 1617 f.readinto(view) 1618 1619 def test_readinto_with_memoryview_of_mmap_reads_bytes_to_pointer(self): 1620 r, w = os.pipe() 1621 w = os.fdopen(w, "w") 1622 w.write("hello") 1623 w.close() 1624 mm = mmap.mmap(-1, 2) 1625 view = memoryview(mm) 1626 with _io.FileIO(r, mode="r") as f: 1627 self.assertEqual(f.readinto(view), 2) 1628 self.assertEqual(bytes(view), b"he") 1629 1630 def test_readinto_with_mmap_reads_bytes_to_memory(self): 1631 r, w = os.pipe() 1632 w = os.fdopen(w, "w") 1633 w.write("hello") 1634 w.close() 1635 mm = mmap.mmap(-1, 2) 1636 with _io.FileIO(r, mode="r") as f: 1637 self.assertEqual(f.readinto(mm), 2) 1638 view = memoryview(mm) 1639 self.assertEqual(bytes(view), b"he") 1640 1641 def test_readinto_with_closed_file_raises_value_error(self): 1642 r, w = os.pipe() 1643 w = os.fdopen(w, "w") 1644 w.write("hello") 1645 w.close() 1646 barr = bytearray(b"abc") 1647 with _io.FileIO(r, mode="r") as f: 1648 f.close() 1649 with self.assertRaises(ValueError): 1650 f.readinto(barr) 1651 1652 def test_readinto_with_empty_bytearray_returns_zero(self): 1653 r, w = os.pipe() 1654 w = os.fdopen(w, "w") 1655 w.write("hello") 1656 w.close() 1657 barr = bytearray(b"") 1658 with _io.FileIO(r, mode="r") as f: 1659 self.assertEqual(f.readinto(barr), 0) 1660 self.assertEqual(barr, b"") 1661 1662 def test_seek_with_float_raises_type_error(self): 1663 with _io.FileIO(_getfd(), mode="r") as f: 1664 self.assertRaises(TypeError, f.seek, 3.4) 1665 1666 def test_seek_with_closed_file_raises_value_error(self): 1667 f = _io.FileIO(_getfd(), mode="r") 1668 f.close() 1669 self.assertRaises(ValueError, f.seek, 3) 1670 1671 def test_seek_with_non_readable_file_raises_os_error(self): 1672 with _io.FileIO(_getfd(), mode="w") as f: 1673 self.assertRaises(OSError, f.seek, 3) 1674 1675 def test_seekable_with_closed_file_raises_value_error(self): 1676 f = _io.FileIO(_getfd(), mode="r") 1677 f.close() 1678 self.assertRaises(ValueError, f.seekable) 1679 1680 def test_seekable_does_not_call_subclass_tell(self): 1681 class C(_io.FileIO): 1682 def tell(self): 1683 raise UserWarning("foo") 1684 1685 with C(_getfd(), mode="w") as f: 1686 self.assertFalse(f.seekable()) 1687 1688 def test_seekable_with_non_seekable_file_returns_false(self): 1689 with _io.FileIO(_getfd(), mode="w") as f: 1690 self.assertFalse(f.seekable()) 1691 1692 def test_supports_arbitrary_attributes(self): 1693 with _io.FileIO(_getfd(), mode="r") as f: 1694 f.good_morning = 333 1695 self.assertEqual(f.good_morning, 333) 1696 1697 def test_tell_with_closed_file_raises_value_error(self): 1698 f = _io.FileIO(_getfd(), mode="r") 1699 f.close() 1700 self.assertRaises(ValueError, f.tell) 1701 1702 def test_tell_with_non_readable_file_raises_os_error(self): 1703 with _io.FileIO(_getfd(), mode="w") as f: 1704 self.assertRaises(OSError, f.tell) 1705 1706 def test_truncate_with_closed_file_raises_value_error(self): 1707 f = _io.FileIO(_getfd(), mode="w") 1708 f.close() 1709 self.assertRaises(ValueError, f.truncate) 1710 1711 def test_truncate_with_non_writable_file_raises_unsupported_operation(self): 1712 with _io.FileIO(_getfd(), mode="r") as f: 1713 self.assertRaises(_io.UnsupportedOperation, f.truncate) 1714 1715 def test_writable_with_closed_file_raises_value_error(self): 1716 f = _io.FileIO(_getfd(), mode="w") 1717 f.close() 1718 self.assertRaises(ValueError, f.writable) 1719 1720 def test_writable_with_writable_file_returns_true(self): 1721 with _io.FileIO(_getfd(), mode="w") as f: 1722 self.assertTrue(f.writable()) 1723 1724 def test_writable_with_non_writable_file_returns_false(self): 1725 with _io.FileIO(_getfd(), mode="r") as f: 1726 self.assertFalse(f.writable()) 1727 1728 def test_write_with_non_buffer_raises_type_error(self): 1729 class C: 1730 pass 1731 1732 c = C() 1733 f = _io.FileIO(_getfd(), mode="w") 1734 with self.assertRaises(TypeError) as context: 1735 f.write(c) 1736 self.assertEqual( 1737 str(context.exception), "a bytes-like object is required, not 'C'" 1738 ) 1739 f.close() 1740 1741 def test_write_with_raising_dunder_buffer_raises_type_error(self): 1742 class C: 1743 def __buffer__(self): 1744 raise UserWarning("foo") 1745 1746 f = _io.FileIO(_getfd(), mode="w") 1747 c = C() 1748 with self.assertRaises(TypeError) as context: 1749 f.write(c) 1750 self.assertEqual( 1751 str(context.exception), "a bytes-like object is required, not 'C'" 1752 ) 1753 f.close() 1754 1755 def test_write_with_bad_dunder_buffer_raises_type_error(self): 1756 class C: 1757 def __buffer__(self): 1758 return "foo" 1759 1760 f = _io.FileIO(_getfd(), mode="w") 1761 c = C() 1762 with self.assertRaises(TypeError) as context: 1763 f.write(c) 1764 self.assertEqual( 1765 str(context.exception), "a bytes-like object is required, not 'C'" 1766 ) 1767 f.close() 1768 1769 def test_write_writes_bytes(self): 1770 r, w = os.pipe() 1771 with _io.FileIO(w, mode="w") as f: 1772 f.write(b"hello world") 1773 os.close(r) 1774 1775 def test_write_writes_bytearray(self): 1776 r, w = os.pipe() 1777 with _io.FileIO(w, mode="w") as f: 1778 f.write(bytearray(b"hello world")) 1779 os.close(r) 1780 1781 1782@pyro_only 1783class FspathTests(unittest.TestCase): 1784 def test_fspath_with_str_returns_str(self): 1785 result = "hello" 1786 self.assertIs(_io._fspath(result), result) 1787 1788 def test_fspath_with_str_subclass_returns_str(self): 1789 class C(str): 1790 pass 1791 1792 result = C("hello") 1793 self.assertIs(_io._fspath(result), result) 1794 1795 def test_fspath_with_str_subclass_does_not_call_dunder_fspath(self): 1796 class C(str): 1797 def __fspath__(self): 1798 raise UserWarning("foo") 1799 1800 result = C("hello") 1801 self.assertIs(_io._fspath(result), result) 1802 1803 def test_fspath_with_bytes_returns_bytes(self): 1804 result = b"hello" 1805 self.assertIs(_io._fspath(result), result) 1806 1807 def test_fspath_with_bytes_subclass_returns_bytes(self): 1808 class C(bytes): 1809 pass 1810 1811 result = C(b"hello") 1812 self.assertIs(_io._fspath(result), result) 1813 1814 def test_fspath_with_bytes_subclass_does_not_call_dunder_fspath(self): 1815 class C(bytes): 1816 def __fspath__(self): 1817 raise UserWarning("foo") 1818 1819 result = C(b"hello") 1820 self.assertIs(_io._fspath(result), result) 1821 1822 def test_fspath_with_no_dunder_fspath_raises_type_error(self): 1823 class C: 1824 pass 1825 1826 result = C() 1827 self.assertRaises(TypeError, _io._fspath, result) 1828 1829 def test_fspath_calls_dunder_fspath_returns_str(self): 1830 class C: 1831 def __fspath__(self): 1832 return "foo" 1833 1834 self.assertEqual(_io._fspath(C()), "foo") 1835 1836 def test_fspath_calls_dunder_fspath_returns_bytes(self): 1837 class C: 1838 def __fspath__(self): 1839 return b"foo" 1840 1841 self.assertEqual(_io._fspath(C()), b"foo") 1842 1843 def test_fspath_with_dunder_fspath_returning_non_str_raises_type_error(self): 1844 class C: 1845 def __fspath__(self): 1846 return 5 1847 1848 result = C() 1849 self.assertRaises(TypeError, _io._fspath, result) 1850 1851 1852class OpenTests(unittest.TestCase): 1853 def test_open_with_bad_mode_raises_type_error(self): 1854 with self.assertRaisesRegex( 1855 TypeError, r"open\(\) argument .* must be str, not int" 1856 ): 1857 _io.open(0, mode=5) 1858 1859 def test_open_with_bad_buffering_raises_type_error(self): 1860 with self.assertRaises(TypeError) as context: 1861 _io.open(0, buffering=None) 1862 1863 self.assertEqual( 1864 str(context.exception), "an integer is required (got type NoneType)" 1865 ) 1866 1867 def test_open_with_bad_encoding_raises_type_error(self): 1868 with self.assertRaisesRegex( 1869 TypeError, r"open\(\) argument .* must be str or None, not int" 1870 ): 1871 _io.open(0, encoding=5) 1872 1873 def test_open_with_bad_errors_raises_type_error(self): 1874 with self.assertRaisesRegex( 1875 TypeError, r"open\(\) argument .* must be str or None, not int" 1876 ): 1877 _io.open(0, errors=5) 1878 1879 def test_open_with_duplicate_mode_raises_value_error(self): 1880 with self.assertRaises(ValueError) as context: 1881 _io.open(0, mode="rr") 1882 1883 self.assertEqual(str(context.exception), "invalid mode: 'rr'") 1884 1885 def test_open_with_bad_mode_character_raises_value_error(self): 1886 with self.assertRaises(ValueError) as context: 1887 _io.open(0, mode="q") 1888 1889 self.assertEqual(str(context.exception), "invalid mode: 'q'") 1890 1891 def test_open_with_U_and_writing_raises_value_error(self): 1892 with self.assertRaises(ValueError) as context: 1893 _io.open(0, mode="Uw") 1894 1895 self.assertRegex(str(context.exception), "mode U cannot be combined with") 1896 1897 def test_open_with_U_and_appending_raises_value_error(self): 1898 with self.assertRaises(ValueError) as context: 1899 _io.open(0, mode="Ua") 1900 1901 self.assertRegex(str(context.exception), "mode U cannot be combined with") 1902 1903 def test_open_with_U_and_updating_raises_value_error(self): 1904 with self.assertRaises(ValueError) as context: 1905 _io.open(0, mode="U+") 1906 1907 self.assertRegex(str(context.exception), "mode U cannot be combined with") 1908 1909 def test_open_with_text_and_binary_raises_value_error(self): 1910 with self.assertRaises(ValueError) as context: 1911 _io.open(0, mode="tb") 1912 1913 self.assertEqual( 1914 str(context.exception), "can't have text and binary mode at once" 1915 ) 1916 1917 def test_open_with_creating_and_reading_raises_value_error(self): 1918 with self.assertRaises(ValueError) as context: 1919 _io.open(0, mode="rx") 1920 1921 self.assertEqual( 1922 str(context.exception), 1923 "must have exactly one of create/read/write/append mode", 1924 ) 1925 1926 def test_open_with_creating_and_writing_raises_value_error(self): 1927 with self.assertRaises(ValueError) as context: 1928 _io.open(0, mode="wx") 1929 1930 self.assertEqual( 1931 str(context.exception), 1932 "must have exactly one of create/read/write/append mode", 1933 ) 1934 1935 def test_open_with_creating_and_appending_raises_value_error(self): 1936 with self.assertRaises(ValueError) as context: 1937 _io.open(0, mode="ax") 1938 1939 self.assertEqual( 1940 str(context.exception), 1941 "must have exactly one of create/read/write/append mode", 1942 ) 1943 1944 def test_open_with_reading_and_writing_raises_value_error(self): 1945 with self.assertRaises(ValueError) as context: 1946 _io.open(0, mode="rw") 1947 1948 self.assertEqual( 1949 str(context.exception), 1950 "must have exactly one of create/read/write/append mode", 1951 ) 1952 1953 def test_open_with_reading_and_appending_raises_value_error(self): 1954 with self.assertRaises(ValueError) as context: 1955 _io.open(0, mode="ra") 1956 1957 self.assertEqual( 1958 str(context.exception), 1959 "must have exactly one of create/read/write/append mode", 1960 ) 1961 1962 def test_open_with_writing_and_appending_raises_value_error(self): 1963 with self.assertRaises(ValueError) as context: 1964 _io.open(0, mode="wa") 1965 1966 self.assertEqual( 1967 str(context.exception), 1968 "must have exactly one of create/read/write/append mode", 1969 ) 1970 1971 def test_open_with_no_reading_creating_writing_appending_raises_value_error(self): 1972 with self.assertRaises(ValueError) as context: 1973 _io.open(0, mode="") 1974 1975 self.assertEqual( 1976 str(context.exception), 1977 "Must have exactly one of create/read/write/append mode and at " 1978 "most one plus", 1979 ) 1980 1981 def test_open_with_binary_and_encoding_raises_value_error(self): 1982 with self.assertRaises(ValueError) as context: 1983 _io.open(0, mode="rb", encoding="utf-8") 1984 1985 self.assertEqual( 1986 str(context.exception), "binary mode doesn't take an encoding argument" 1987 ) 1988 1989 def test_open_with_binary_and_errors_raises_value_error(self): 1990 with self.assertRaises(ValueError) as context: 1991 _io.open(0, mode="rb", errors="error") 1992 1993 self.assertEqual( 1994 str(context.exception), "binary mode doesn't take an errors argument" 1995 ) 1996 1997 def test_open_with_binary_and_newline_raises_value_error(self): 1998 with self.assertRaises(ValueError) as context: 1999 _io.open(0, mode="rb", newline="nl") 2000 2001 self.assertEqual( 2002 str(context.exception), "binary mode doesn't take a newline argument" 2003 ) 2004 2005 def test_open_with_no_buffering_and_binary_returns_file_io(self): 2006 fd = _getfd() 2007 with _io.open(fd, buffering=False, mode="rb") as result: 2008 self.assertIsInstance(result, _io.FileIO) 2009 2010 @unittest.skipIf(sys.platform == "darwin", "Darwin rejects non-utf8 filenames") 2011 def test_open_non_utf8_filename(self): 2012 with tempfile.TemporaryDirectory() as tempdir: 2013 filename = "bad_surrogate\udc80.txt" 2014 full_path = os.path.join(tempdir, filename) 2015 with open(full_path, "w") as fp: 2016 fp.write("foobar") 2017 filename_encoded = filename.encode( 2018 sys.getfilesystemencoding(), sys.getfilesystemencodeerrors() 2019 ) 2020 tempdir_bytes = tempdir.encode( 2021 sys.getfilesystemencoding(), sys.getfilesystemencodeerrors() 2022 ) 2023 files = os.listdir(tempdir_bytes) 2024 self.assertIn(filename_encoded, files) 2025 2026 with open(full_path, "r") as fp: 2027 self.assertEqual(fp.read(), "foobar") 2028 2029 def test_open_code_returns_buffered_reader(self): 2030 with tempfile.TemporaryDirectory() as tempdir: 2031 filename = "temp.txt" 2032 full_path = os.path.join(tempdir, filename) 2033 2034 with open(full_path, "w") as fp: 2035 fp.write("foobar") 2036 2037 with _io.open_code(full_path) as result: 2038 self.assertIsInstance(result, _io.BufferedReader) 2039 2040 2041@pyro_only 2042class UnderBufferedIOMixinTests(unittest.TestCase): 2043 def test_dunder_init_sets_raw(self): 2044 with _io.FileIO(_getfd(), closefd=True) as f: 2045 result = _BufferedIOMixin(f) 2046 self.assertIs(result.raw, f) 2047 2048 def test_dunder_repr_returns_str(self): 2049 result = _BufferedIOMixin(None) 2050 self.assertEqual(result.__repr__(), "<_io._BufferedIOMixin>") 2051 2052 def test_dunder_repr_gets_raw_name(self): 2053 class C: 2054 name = "foo" 2055 2056 result = _BufferedIOMixin(C()) 2057 self.assertEqual(result.__repr__(), "<_io._BufferedIOMixin name='foo'>") 2058 2059 def test_close_with_none_raw_raises_value_error(self): 2060 result = _BufferedIOMixin(None) 2061 self.assertRaisesRegex(ValueError, "raw stream has been detached", result.close) 2062 2063 def test_close_with_closed_raw_does_nothing(self): 2064 class C: 2065 closed = True 2066 2067 def close(self): 2068 pass 2069 2070 result = _BufferedIOMixin(C()) 2071 self.assertIsNone(result.close()) 2072 2073 def test_close_calls_raw_flush(self): 2074 # TODO(T53510135): Use unittest.mock 2075 class C: 2076 closed = False 2077 2078 def flush(self): 2079 raise UserWarning("foo") 2080 2081 def close(self): 2082 pass 2083 2084 result = _BufferedIOMixin(C()) 2085 self.assertRaises(UserWarning, result.close) 2086 2087 def test_close_calls_raw_close(self): 2088 # TODO(T53510135): Use unittest.mock 2089 class C: 2090 closed = False 2091 2092 def close(self): 2093 raise UserWarning("foo") 2094 2095 result = _BufferedIOMixin(C()) 2096 self.assertRaises(UserWarning, result.close) 2097 2098 def test_closed_with_none_raw_raises_value_error(self): 2099 result = _BufferedIOMixin(None) 2100 with self.assertRaisesRegex(ValueError, "raw stream has been detached"): 2101 result.closed 2102 2103 def test_closed_calls_raw_closed(self): 2104 # TODO(T53510135): Use unittest.mock 2105 class C: 2106 @property 2107 def closed(self): 2108 raise UserWarning("foo") 2109 2110 result = _BufferedIOMixin(C()) 2111 with self.assertRaises(UserWarning): 2112 result.closed 2113 2114 def test_detach_with_none_raw_raises_value_error(self): 2115 result = _BufferedIOMixin(None) 2116 self.assertRaisesRegex( 2117 ValueError, "raw stream has been detached", result.detach 2118 ) 2119 2120 def test_detach_calls_raw_flush(self): 2121 # TODO(T53510135): Use unittest.mock 2122 class C: 2123 closed = False 2124 2125 def flush(self): 2126 raise UserWarning("foo") 2127 2128 result = _BufferedIOMixin(C()) 2129 self.assertRaises(UserWarning, result.detach) 2130 2131 def test_detach_returns_raw_and_sets_none(self): 2132 with _io.BytesIO() as raw: 2133 result = _BufferedIOMixin(raw) 2134 self.assertIs(result.detach(), raw) 2135 self.assertIs(result.raw, None) 2136 2137 def test_fileno_with_none_raw_raises_value_error(self): 2138 result = _BufferedIOMixin(None) 2139 self.assertRaisesRegex( 2140 ValueError, "raw stream has been detached", result.fileno 2141 ) 2142 2143 def test_fileno_calls_raw_fileno(self): 2144 # TODO(T53510135): Use unittest.mock 2145 class C: 2146 closed = False 2147 2148 def fileno(self): 2149 raise UserWarning("foo") 2150 2151 result = _BufferedIOMixin(C()) 2152 self.assertRaises(UserWarning, result.fileno) 2153 2154 def test_flush_with_none_raw_raises_value_error(self): 2155 result = _BufferedIOMixin(None) 2156 self.assertRaisesRegex(ValueError, "raw stream has been detached", result.flush) 2157 2158 def test_flush_with_closed_raw_raises_value_error(self): 2159 class C: 2160 closed = True 2161 2162 def close(self): 2163 pass 2164 2165 result = _BufferedIOMixin(C()) 2166 self.assertRaises(ValueError, result.flush) 2167 2168 def test_flush_calls_raw_flush(self): 2169 # TODO(T53510135): Use unittest.mock 2170 class C: 2171 closed = False 2172 2173 def flush(self): 2174 raise UserWarning("foo") 2175 2176 result = _BufferedIOMixin(C()) 2177 self.assertRaises(UserWarning, result.flush) 2178 2179 def test_isatty_calls_raw_isatty(self): 2180 # TODO(T53510135): Use unittest.mock 2181 class C: 2182 closed = False 2183 2184 def isatty(self): 2185 raise UserWarning("foo") 2186 2187 result = _BufferedIOMixin(C()) 2188 self.assertRaises(UserWarning, result.isatty) 2189 2190 def test_isatty_with_none_raw_raises_value_error(self): 2191 result = _BufferedIOMixin(None) 2192 self.assertRaisesRegex( 2193 ValueError, "raw stream has been detached", result.isatty 2194 ) 2195 2196 def test_mode_calls_raw_mode(self): 2197 # TODO(T53510135): Use unittest.mock 2198 class C: 2199 @property 2200 def mode(self): 2201 raise UserWarning("foo") 2202 2203 result = _BufferedIOMixin(C()) 2204 with self.assertRaises(UserWarning): 2205 result.mode 2206 2207 def test_name_calls_raw_name(self): 2208 # TODO(T53510135): Use unittest.mock 2209 class C: 2210 @property 2211 def name(self): 2212 raise UserWarning("foo") 2213 2214 result = _BufferedIOMixin(C()) 2215 with self.assertRaises(UserWarning): 2216 result.name 2217 2218 def test_seek_with_none_raw_raises_value_error(self): 2219 result = _BufferedIOMixin(None) 2220 with self.assertRaisesRegex(ValueError, "raw stream has been detached"): 2221 result.seek(0) 2222 2223 def test_tell_with_none_raw_raises_value_error(self): 2224 result = _BufferedIOMixin(None) 2225 self.assertRaisesRegex(ValueError, "raw stream has been detached", result.tell) 2226 2227 def test_tell_calls_raw_tell(self): 2228 # TODO(T53510135): Use unittest.mock 2229 class C: 2230 def tell(self): 2231 raise UserWarning("foo") 2232 2233 result = _BufferedIOMixin(C()) 2234 self.assertRaises(UserWarning, result.tell) 2235 2236 def test_tell_with_raw_tell_returning_negative_raises_os_error(self): 2237 class C: 2238 def tell(self): 2239 return -1 2240 2241 result = _BufferedIOMixin(C()) 2242 self.assertRaises(OSError, result.tell) 2243 2244 def test_tell_returns_result_of_raw_tell(self): 2245 class C: 2246 def tell(self): 2247 return 5 2248 2249 result = _BufferedIOMixin(C()) 2250 self.assertEqual(result.tell(), 5) 2251 2252 def test_truncate_calls_raw_flush(self): 2253 # TODO(T53510135): Use unittest.mock 2254 class C: 2255 closed = False 2256 2257 def flush(self): 2258 raise UserWarning("foo") 2259 2260 result = _BufferedIOMixin(C()) 2261 self.assertRaises(UserWarning, result.truncate) 2262 2263 def test_truncate_with_none_pos_calls_tell(self): 2264 # TODO(T53510135): Use unittest.mock 2265 class C: 2266 closed = False 2267 2268 def flush(self): 2269 pass 2270 2271 def tell(self): 2272 raise UserWarning("foo") 2273 2274 result = _BufferedIOMixin(C()) 2275 self.assertRaises(UserWarning, result.truncate) 2276 2277 def test_truncate_with_none_pos_calls_raw_truncate_with_pos(self): 2278 # TODO(T53510135): Use unittest.mock 2279 class C: 2280 closed = False 2281 2282 def flush(self): 2283 pass 2284 2285 def tell(self): 2286 return 5 2287 2288 def truncate(self, pos): 2289 raise UserWarning(pos) 2290 2291 result = _BufferedIOMixin(C()) 2292 with self.assertRaises(UserWarning) as context: 2293 result.truncate() 2294 self.assertEqual(context.exception.args, (5,)) 2295 2296 def test_truncate_calls_raw_truncate(self): 2297 # TODO(T53510135): Use unittest.mock 2298 class C: 2299 closed = False 2300 2301 def flush(self): 2302 pass 2303 2304 def tell(self): 2305 raise MemoryError("foo") 2306 2307 def truncate(self, pos): 2308 raise UserWarning(pos) 2309 2310 result = _BufferedIOMixin(C()) 2311 with self.assertRaises(UserWarning) as context: 2312 result.truncate(10) 2313 self.assertEqual(context.exception.args, (10,)) 2314 2315 def test_seek_calls_raw_seek(self): 2316 # TODO(T53510135): Use unittest.mock 2317 class C: 2318 def seek(self, pos, whence): 2319 raise UserWarning(pos, whence) 2320 2321 result = _BufferedIOMixin(C()) 2322 self.assertRaises(UserWarning, result.seek, 5, 0) 2323 2324 def tes_seek_returning_negative_pos_raises_os_error(self): 2325 class C: 2326 def seek(self, pos, whence): 2327 return -1 2328 2329 result = _BufferedIOMixin(C()) 2330 self.assertRaises(OSError, result.seek, 5, 0) 2331 2332 def test_seek_returns_result_of_raw_seek(self): 2333 class C: 2334 def seek(self, pos, whence): 2335 return 100 2336 2337 result = _BufferedIOMixin(C()) 2338 self.assertEqual(result.seek(5, 0), 100) 2339 2340 def test_seekable_calls_raw_seekable(self): 2341 # TODO(T53510135): Use unittest.mock 2342 class C: 2343 closed = False 2344 2345 def seekable(self): 2346 raise UserWarning("foo") 2347 2348 result = _BufferedIOMixin(C()) 2349 self.assertRaises(UserWarning, result.seekable) 2350 2351 2352class IncrementalNewlineDecoderTests(unittest.TestCase): 2353 def test_dunder_init_with_non_int_translate_raises_type_error(self): 2354 with self.assertRaises(TypeError) as context: 2355 _io.IncrementalNewlineDecoder(None, translate="foo") 2356 self.assertEqual( 2357 str(context.exception), "an integer is required (got type str)" 2358 ) 2359 2360 def test_decode_with_none_decoder_uses_input(self): 2361 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1) 2362 self.assertEqual(decoder.decode("bar"), "bar") 2363 2364 def test_decode_with_non_int_final_raises_type_error(self): 2365 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1) 2366 with self.assertRaises(TypeError) as context: 2367 decoder.decode("bar", final="foo") 2368 self.assertEqual( 2369 str(context.exception), "an integer is required (got type str)" 2370 ) 2371 2372 def test_decode_with_decoder_calls_decoder_on_input(self): 2373 class C: 2374 def decode(self, input, final): 2375 raise UserWarning(input, final) 2376 2377 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1) 2378 with self.assertRaises(UserWarning) as context: 2379 decoder.decode("foo", 5) 2380 self.assertEqual(context.exception.args, ("foo", True)) 2381 2382 def test_decode_with_no_newlines_returns_input(self): 2383 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1) 2384 self.assertEqual(decoder.decode("bar"), "bar") 2385 self.assertEqual(decoder.newlines, None) 2386 self.assertEqual(decoder.getstate(), (b"", 0)) 2387 2388 def test_decode_with_carriage_return_does_not_translate_to_newline(self): 2389 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=0) 2390 self.assertEqual(decoder.decode("bar\r\n"), "bar\r\n") 2391 self.assertEqual(decoder.newlines, "\r\n") 2392 self.assertEqual(decoder.getstate(), (b"", 0)) 2393 2394 def test_decode_with_trailing_carriage_removes_carriage(self): 2395 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=0) 2396 self.assertEqual(decoder.decode("bar\r"), "bar") 2397 self.assertEqual(decoder.newlines, None) 2398 self.assertEqual(decoder.getstate(), (b"", 1)) 2399 2400 def test_decode_with_carriage_does_not_translate_to_newline(self): 2401 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=0) 2402 self.assertEqual(decoder.decode("bar\rbaz"), "bar\rbaz") 2403 self.assertEqual(decoder.newlines, "\r") 2404 self.assertEqual(decoder.getstate(), (b"", 0)) 2405 2406 def test_decode_translate_with_carriage_return_translates_to_newline(self): 2407 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1) 2408 self.assertEqual(decoder.decode("bar\r\n"), "bar\n") 2409 self.assertEqual(decoder.newlines, "\r\n") 2410 self.assertEqual(decoder.getstate(), (b"", 0)) 2411 2412 def test_decode_translate_with_carriage_translates_to_newline(self): 2413 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1) 2414 self.assertEqual(decoder.decode("bar\rbaz"), "bar\nbaz") 2415 self.assertEqual(decoder.newlines, "\r") 2416 self.assertEqual(decoder.getstate(), (b"", 0)) 2417 2418 def test_getstate_with_decoder_calls_decoder_getstate(self): 2419 class C: 2420 def decode(self, input, final): 2421 return input 2422 2423 def getstate(self): 2424 return (b"foo", 5) 2425 2426 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1) 2427 self.assertEqual(decoder.getstate(), (b"foo", 10)) 2428 2429 def test_getstate_with_decoder_calls_decoder_getstate_carriage(self): 2430 class C: 2431 def decode(self, input, final): 2432 return input 2433 2434 def getstate(self): 2435 return (b"foo", 5) 2436 2437 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1) 2438 decoder.decode("bar\r") 2439 self.assertEqual(decoder.getstate(), (b"foo", 11)) 2440 2441 def test_setstate_with_decoder_calls_decoder_setstate(self): 2442 set_state = None 2443 2444 class C: 2445 def decode(self, input, final): 2446 return input 2447 2448 def getstate(self): 2449 return b"bar", 1 2450 2451 def setstate(self, state): 2452 nonlocal set_state 2453 set_state = state 2454 2455 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1) 2456 self.assertIsNone(decoder.setstate((b"foo", 4))) 2457 decoder.decode("bar\r") 2458 self.assertEqual(set_state, (b"foo", 2)) 2459 self.assertEqual(decoder.getstate(), (b"bar", 3)) 2460 2461 def test_setstate_with_decoder_calls_decoder_setstate_carriage(self): 2462 set_state = None 2463 2464 class C: 2465 def decode(self, input, final): 2466 return input 2467 2468 def getstate(self): 2469 return b"bar", 1 2470 2471 def setstate(self, state): 2472 nonlocal set_state 2473 set_state = state 2474 2475 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1) 2476 self.assertIsNone(decoder.setstate((b"foo", 4))) 2477 decoder.decode("bar\r") 2478 self.assertEqual(set_state, (b"foo", 2)) 2479 self.assertEqual(decoder.getstate(), (b"bar", 3)) 2480 2481 def test_reset_changes_newlines(self): 2482 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1) 2483 decoder.decode("bar\rbaz") 2484 self.assertEqual(decoder.newlines, "\r") 2485 self.assertIsNone(decoder.reset()) 2486 self.assertEqual(decoder.newlines, None) 2487 2488 def test_reset_changes_pendingcr(self): 2489 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1) 2490 decoder.decode("bar\r") 2491 self.assertEqual(decoder.getstate(), (b"", 1)) 2492 self.assertIsNone(decoder.reset()) 2493 self.assertEqual(decoder.getstate(), (b"", 0)) 2494 2495 def test_reset_calls_decoder_reset(self): 2496 class C: 2497 def reset(self): 2498 raise UserWarning("foo") 2499 2500 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1) 2501 self.assertRaises(UserWarning, decoder.reset) 2502 2503 2504class UnderTextIOBaseTests(unittest.TestCase): 2505 def test_read_in_dict(self): 2506 self.assertIn("read", _io._TextIOBase.__dict__) 2507 2508 def test_write_in_dict(self): 2509 self.assertIn("write", _io._TextIOBase.__dict__) 2510 2511 def test_readline_in_dict(self): 2512 self.assertIn("readline", _io._TextIOBase.__dict__) 2513 2514 def test_detach_in_dict(self): 2515 self.assertIn("detach", _io._TextIOBase.__dict__) 2516 2517 def test_encoding_in_dict(self): 2518 self.assertIn("encoding", _io._TextIOBase.__dict__) 2519 2520 def test_newlines_in_dict(self): 2521 self.assertIn("newlines", _io._TextIOBase.__dict__) 2522 2523 def test_errors_in_dict(self): 2524 self.assertIn("errors", _io._TextIOBase.__dict__) 2525 2526 2527class BufferedReaderTests(unittest.TestCase): 2528 def test_dunder_init_with_non_readable_stream_raises_os_error(self): 2529 with _io.FileIO(_getfd(), mode="w") as file_reader: 2530 with self.assertRaises(OSError) as context: 2531 _io.BufferedReader(file_reader) 2532 self.assertEqual(str(context.exception), "File or stream is not readable.") 2533 2534 def test_dunder_init_allows_subclasses(self): 2535 class C(_io.BufferedReader): 2536 pass 2537 2538 instance = C(_io.BytesIO(b"")) 2539 self.assertIsInstance(instance, _io.BufferedReader) 2540 2541 def test_peek_returns_buffered_data(self): 2542 with _io.BufferedReader(_io.BytesIO(b"hello"), buffer_size=3) as buffered_io: 2543 self.assertEqual(buffered_io.peek(-10), b"hel") 2544 self.assertEqual(buffered_io.peek(1), b"hel") 2545 self.assertEqual(buffered_io.peek(2), b"hel") 2546 2547 def test_raw_returns_raw(self): 2548 with _io.BytesIO(b"hello") as bytes_io: 2549 with _io.BufferedReader(bytes_io) as buffered_io: 2550 self.assertIs(buffered_io.raw, bytes_io) 2551 2552 def test_read_with_detached_stream_raises_value_error(self): 2553 with _io.FileIO(_getfd(), mode="r") as file_reader: 2554 buffered = _io.BufferedReader(file_reader) 2555 buffered.detach() 2556 with self.assertRaises(ValueError) as context: 2557 buffered.read() 2558 self.assertEqual(str(context.exception), "raw stream has been detached") 2559 2560 def test_read_with_closed_stream_raises_value_error(self): 2561 file_reader = _io.FileIO(_getfd(), mode="r") 2562 buffered = _io.BufferedReader(file_reader) 2563 file_reader.close() 2564 with self.assertRaises(ValueError) as context: 2565 buffered.read() 2566 self.assertIn("closed file", str(context.exception)) 2567 2568 def test_read_with_negative_size_raises_value_error(self): 2569 with _io.FileIO(_getfd(), mode="r") as file_reader: 2570 buffered = _io.BufferedReader(file_reader) 2571 with self.assertRaises(ValueError) as context: 2572 buffered.read(-2) 2573 self.assertEqual( 2574 str(context.exception), "read length must be non-negative or -1" 2575 ) 2576 2577 def test_read_with_none_size_calls_raw_readall(self): 2578 class C(_io.FileIO): 2579 def readall(self): 2580 raise UserWarning("foo") 2581 2582 with C(_getfd(), mode="r") as file_reader: 2583 buffered = _io.BufferedReader(file_reader) 2584 with self.assertRaises(UserWarning) as context: 2585 buffered.read(None) 2586 self.assertEqual(context.exception.args, ("foo",)) 2587 2588 def test_read_reads_bytes(self): 2589 with _io.BytesIO(b"hello") as bytes_io: 2590 buffered = _io.BufferedReader(bytes_io, buffer_size=1) 2591 result = buffered.read() 2592 self.assertEqual(result, b"hello") 2593 2594 def test_read_reads_count_bytes(self): 2595 with _io.BytesIO(b"hello") as bytes_io: 2596 buffered = _io.BufferedReader(bytes_io, buffer_size=1) 2597 result = buffered.read(3) 2598 self.assertEqual(result, b"hel") 2599 2600 def test_readinto_writes_to_buffer(self): 2601 with _io.BytesIO(b"hello") as bytes_io: 2602 with _io.BufferedReader(bytes_io, buffer_size=4) as buffered: 2603 ba = bytearray(b"XXXXXXXXXXXX") 2604 buffered.readinto(ba) 2605 self.assertEqual(ba, bytearray(b"helloXXXXXXX")) 2606 2607 def test_read1_calls_read(self): 2608 with _io.BytesIO(b"hello") as bytes_io: 2609 buffered = _io.BufferedReader(bytes_io, buffer_size=10) 2610 result = buffered.read1(3) 2611 self.assertEqual(result, b"hel") 2612 2613 def test_read1_reads_from_buffer(self): 2614 with _io.BytesIO(b"hello") as bytes_io: 2615 buffered = _io.BufferedReader(bytes_io, buffer_size=4) 2616 buffered.read(1) 2617 result = buffered.read1(10) 2618 self.assertEqual(result, b"ell") 2619 2620 def test_read1_with_size_not_specified_returns_rest_in_buffer(self): 2621 with _io.BytesIO(b"hello") as bytes_io: 2622 buffered = _io.BufferedReader(bytes_io, buffer_size=4) 2623 buffered.read(1) 2624 result = buffered.read1() 2625 self.assertEqual(result, b"ell") 2626 2627 def test_read1_with_none_size_raises_type_error(self): 2628 with _io.BytesIO(b"hello") as bytes_io: 2629 buffered = _io.BufferedReader(bytes_io, buffer_size=4) 2630 with self.assertRaises(TypeError): 2631 buffered.read1(None) 2632 2633 def test_readable_calls_raw_readable(self): 2634 readable_calls = 0 2635 2636 class C: 2637 closed = False 2638 2639 def readable(self): 2640 nonlocal readable_calls 2641 readable_calls += 1 2642 return True 2643 2644 result = _io.BufferedReader(C()) 2645 self.assertEqual(readable_calls, 1) 2646 self.assertTrue(result.readable()) 2647 self.assertEqual(readable_calls, 2) 2648 2649 def test_tell_returns_current_position(self): 2650 with _io.BytesIO(b"hello") as bytes_io: 2651 buffered = _io.BufferedReader(bytes_io) 2652 self.assertEqual(buffered.tell(), 0) 2653 buffered.read(2) 2654 self.assertEqual(buffered.tell(), 2) 2655 2656 def test_seek_with_invalid_whence(self): 2657 with _io.BytesIO(b"hello") as bytes_io: 2658 buffered = _io.BufferedReader(bytes_io) 2659 self.assertRaises(ValueError, buffered.seek, 0, 4) 2660 self.assertRaises(ValueError, buffered.seek, 0, -1) 2661 2662 def test_seek_resets_position(self): 2663 with _io.BytesIO(b"hello") as bytes_io: 2664 buffered = _io.BufferedReader(bytes_io) 2665 buffered.read(2) 2666 self.assertEqual(buffered.tell(), 2) 2667 buffered.seek(0) 2668 self.assertEqual(buffered.tell(), 0) 2669 2670 def test_supports_arbitrary_attributes(self): 2671 with _io.BytesIO(b"hello") as bytes_io: 2672 buffered = _io.BufferedReader(bytes_io) 2673 buffered.buenos_dias = 1234 2674 self.assertEqual(buffered.buenos_dias, 1234) 2675 2676 2677class TextIOWrapperTests(unittest.TestCase): 2678 def _sample(self): 2679 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="ascii") 2680 2681 def _sample_utf8(self): 2682 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="UTF-8") 2683 2684 def _sample_utf16(self): 2685 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="UTF-16") 2686 2687 def _sample_latin1(self): 2688 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="Latin-1") 2689 2690 def _sample_buffered_writer(self): 2691 return _io.BufferedWriter(_io.BytesIO(b"hello++"), buffer_size=3) 2692 2693 def _sample_buffered_reader(self): 2694 return _io.BufferedReader(_io.BytesIO(b"hello++"), buffer_size=3) 2695 2696 def test_dunder_init_with_none_buffer_raises_attribute_error(self): 2697 with self.assertRaises(AttributeError) as context: 2698 _io.TextIOWrapper(None) 2699 self.assertEqual( 2700 str(context.exception), "'NoneType' object has no attribute 'readable'" 2701 ) 2702 2703 def test_dunder_init_with_non_str_encoding_raises_type_error(self): 2704 with self.assertRaisesRegex( 2705 TypeError, r"TextIOWrapper\(\) argument .* must be str or None, not int" 2706 ): 2707 _io.TextIOWrapper("hello", encoding=5) 2708 2709 def test_dunder_init_with_non_str_errors_raises_type_error(self): 2710 with self.assertRaisesRegex( 2711 TypeError, r"TextIOWrapper\(\) argument .* must be str or None, not int" 2712 ): 2713 _io.TextIOWrapper("hello", errors=5) 2714 2715 def test_dunder_init_with_non_str_newline_raises_type_error(self): 2716 with self.assertRaisesRegex( 2717 TypeError, r"TextIOWrapper\(\) argument .* must be str or None, not int" 2718 ): 2719 _io.TextIOWrapper("hello", newline=5) 2720 2721 def test_dunder_init_with_non_int_line_buffering_raises_type_error(self): 2722 with self.assertRaises(TypeError) as context: 2723 _io.TextIOWrapper("hello", line_buffering="buf") 2724 self.assertEqual( 2725 str(context.exception), "an integer is required (got type str)" 2726 ) 2727 2728 def test_dunder_init_with_non_int_write_through_raises_type_error(self): 2729 with self.assertRaises(TypeError) as context: 2730 _io.TextIOWrapper("hello", write_through="True") 2731 self.assertEqual( 2732 str(context.exception), "an integer is required (got type str)" 2733 ) 2734 2735 def test_dunder_init_with_bad_str_newline_raises_value_error(self): 2736 with self.assertRaises(ValueError) as context: 2737 _io.TextIOWrapper("hello", newline="foo") 2738 self.assertEqual(str(context.exception), "illegal newline value: foo") 2739 2740 def test_dunder_init_returns_text_io_wrapper(self): 2741 with self._sample() as text_io: 2742 self.assertIsInstance(text_io, _io.TextIOWrapper) 2743 2744 def test_dunder_repr(self): 2745 with self._sample() as text_io: 2746 self.assertEqual(text_io.__repr__(), "<_io.TextIOWrapper encoding='ascii'>") 2747 2748 def test_dunder_repr_with_name(self): 2749 class C(_io.TextIOWrapper): 2750 name = "foo" 2751 2752 with C(_io.BytesIO(b"hello"), encoding="ascii") as text_io: 2753 self.assertEqual( 2754 text_io.__repr__(), "<_io.TextIOWrapper name='foo' encoding='ascii'>" 2755 ) 2756 2757 def test_dunder_repr_with_mode(self): 2758 # TODO(T54575279): remove workaround and use subclass again 2759 with _io.TextIOWrapper(_io.BytesIO(b"hello"), encoding="ascii") as text_io: 2760 text_io.mode = "rwx" 2761 self.assertEqual( 2762 repr(text_io), "<_io.TextIOWrapper mode='rwx' encoding='ascii'>" 2763 ) 2764 2765 def test_buffer_returns_buffer(self): 2766 with _io.BytesIO() as bytes_io: 2767 with _io.TextIOWrapper(bytes_io) as text_io: 2768 self.assertIs(text_io.buffer, bytes_io) 2769 2770 def test_close_with_non_TextIOWrapper_raises_type_error(self): 2771 with self.assertRaises(TypeError): 2772 _io.TextIOWrapper.close(5) 2773 2774 def test_close_with_detached_buffer_raises_value_error(self): 2775 text_io = self._sample() 2776 text_io.detach() 2777 self.assertRaisesRegex( 2778 ValueError, "underlying buffer has been detached", text_io.close 2779 ) 2780 2781 def test_closed_with_detached_buffer_raises_value_error(self): 2782 text_io = self._sample() 2783 text_io.detach() 2784 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"): 2785 text_io.closed 2786 2787 def test_detach_with_non_TextIOWrapper_raises_type_error(self): 2788 with self.assertRaises(TypeError): 2789 _io.TextIOWrapper.detach(5) 2790 2791 def test_detach_with_detached_buffer_raises_value_error(self): 2792 text_io = self._sample() 2793 text_io.detach() 2794 self.assertRaisesRegex( 2795 ValueError, "underlying buffer has been detached", text_io.detach 2796 ) 2797 2798 def test_encoding_returns_encoding(self): 2799 with self._sample() as text_io: 2800 self.assertEqual(text_io.encoding, "ascii") 2801 2802 def test_default_encoding_is_UTF8(self): 2803 with _io.BytesIO() as bytes_io: 2804 with _io.TextIOWrapper(bytes_io) as text_io: 2805 self.assertEqual(text_io.encoding, "UTF-8") 2806 2807 def test_errors_returns_default_errors(self): 2808 with self._sample() as text_io: 2809 self.assertEqual(text_io.errors, "strict") 2810 2811 def test_errors_returns_errors(self): 2812 with _io.TextIOWrapper(_io.BytesIO(b"hello"), errors="foobar") as text_io: 2813 self.assertEqual(text_io.errors, "foobar") 2814 2815 def test_line_buffering_returns_line_buffering(self): 2816 with _io.TextIOWrapper(_io.BytesIO(b"hello"), line_buffering=5) as text_io: 2817 self.assertEqual(text_io.line_buffering, True) 2818 2819 def test_name_with_bytes_io_raises_attribute_error(self): 2820 with self._sample() as text_io: 2821 with self.assertRaisesRegex(AttributeError, "name"): 2822 text_io.name 2823 2824 def test_name_with_detached_buffer_raises_value_error(self): 2825 text_io = self._sample() 2826 text_io.detach() 2827 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"): 2828 text_io.name 2829 2830 def test_name_returns_buffer_name(self): 2831 class C(_io.BytesIO): 2832 name = "foobar" 2833 2834 with _io.TextIOWrapper(C(b"hello")) as text_io: 2835 self.assertEqual(text_io.name, "foobar") 2836 2837 def test_fileno_with_non_TextIOWrapper_raises_type_error(self): 2838 with self.assertRaises(TypeError): 2839 _io.TextIOWrapper.fileno(5) 2840 2841 def test_fileno_with_bytes_io_raises_unsupported_operation(self): 2842 with self._sample() as text_io: 2843 with self.assertRaisesRegex(_io.UnsupportedOperation, "fileno"): 2844 text_io.fileno() 2845 2846 def test_fileno_with_detached_buffer_raises_value_error(self): 2847 text_io = self._sample() 2848 text_io.detach() 2849 self.assertRaisesRegex( 2850 ValueError, "underlying buffer has been detached", text_io.fileno 2851 ) 2852 2853 def test_fileno_returns_buffer_fileno(self): 2854 class C(_io.BytesIO): 2855 def fileno(self): 2856 return 5 2857 2858 with _io.TextIOWrapper(C(b"hello")) as text_io: 2859 self.assertEqual(text_io.fileno(), 5) 2860 2861 def test_isatty_with_non_TextIOWrapper_raises_type_error(self): 2862 with self.assertRaises(TypeError): 2863 _io.TextIOWrapper.isatty(5) 2864 2865 def test_isatty_with_bytes_io_returns_false(self): 2866 with self._sample() as text_io: 2867 self.assertFalse(text_io.isatty()) 2868 2869 def test_isatty_with_detached_buffer_raises_value_error(self): 2870 text_io = self._sample() 2871 text_io.detach() 2872 self.assertRaisesRegex( 2873 ValueError, "underlying buffer has been detached", text_io.isatty 2874 ) 2875 2876 def test_isatty_returns_buffer_isatty(self): 2877 class C(_io.BytesIO): 2878 def isatty(self): 2879 return True 2880 2881 with _io.TextIOWrapper(C(b"hello")) as text_io: 2882 self.assertTrue(text_io.isatty()) 2883 2884 def test_newlines_with_detached_buffer_raises_value_error(self): 2885 text_io = self._sample() 2886 text_io.detach() 2887 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"): 2888 text_io.newlines 2889 2890 def test_newlines_returns_newlines(self): 2891 with _io.TextIOWrapper(_io.BytesIO(b"\rhello\n")) as text_io: 2892 text_io.read() 2893 self.assertEqual(text_io.newlines, ("\r", "\n")) 2894 2895 def test_seek_with_non_TextIOWrapper_raises_type_error(self): 2896 with self.assertRaises(TypeError): 2897 _io.TextIOWrapper.seek(5, 5) 2898 2899 def test_seek_with_detached_buffer_raises_value_error(self): 2900 text_io = self._sample() 2901 text_io.detach() 2902 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"): 2903 text_io.seek(0) 2904 2905 def test_seek_with_detached_buffer_and_non_int_whence_raises_type_error(self): 2906 text_io = self._sample() 2907 text_io.detach() 2908 with self.assertRaises(TypeError): 2909 text_io.seek(5, 5.5) 2910 2911 def test_seekable_with_non_TextIOWrapper_raises_type_error(self): 2912 with self.assertRaises(TypeError): 2913 _io.TextIOWrapper.seekable(5) 2914 2915 def test_seekable_with_detached_buffer_raises_value_error(self): 2916 text_io = self._sample() 2917 text_io.detach() 2918 self.assertRaisesRegex( 2919 ValueError, "underlying buffer has been detached", text_io.seekable 2920 ) 2921 2922 def test_seekable_with_closed_io_raises_value_error(self): 2923 text_io = self._sample() 2924 text_io.close() 2925 self.assertRaises(ValueError, text_io.seekable) 2926 2927 def test_readable_with_non_TextIOWrapper_raises_type_error(self): 2928 with self.assertRaises(TypeError): 2929 _io.TextIOWrapper.readable(5) 2930 2931 def test_readable_with_detached_buffer_raises_value_error(self): 2932 text_io = self._sample() 2933 text_io.detach() 2934 self.assertRaisesRegex( 2935 ValueError, "underlying buffer has been detached", text_io.readable 2936 ) 2937 2938 def test_readable_calls_buffer_readable(self): 2939 class C(_io.BytesIO): 2940 readable = Mock(name="readable") 2941 2942 with C(b"hello") as bytes_io: 2943 with _io.TextIOWrapper(bytes_io) as text_io: 2944 bytes_io.readable.assert_called_once() 2945 text_io.readable() 2946 self.assertEqual(bytes_io.readable.call_count, 2) 2947 2948 def test_supports_arbitrary_attributes(self): 2949 text_io = self._sample() 2950 text_io.heya = () 2951 self.assertEqual(text_io.heya, ()) 2952 2953 def test_writable_with_non_TextIOWrapper_raises_type_error(self): 2954 with self.assertRaises(TypeError): 2955 _io.TextIOWrapper.writable(5) 2956 2957 def test_writable_with_detached_buffer_raises_value_error(self): 2958 text_io = self._sample() 2959 text_io.detach() 2960 self.assertRaisesRegex( 2961 ValueError, "underlying buffer has been detached", text_io.writable 2962 ) 2963 2964 def test_writable_calls_buffer_writable(self): 2965 class C(_io.BytesIO): 2966 writable = Mock(name="writable") 2967 2968 with C(b"hello") as bytes_io: 2969 with _io.TextIOWrapper(bytes_io) as text_io: 2970 bytes_io.writable.assert_called_once() 2971 text_io.writable() 2972 self.assertEqual(bytes_io.writable.call_count, 2) 2973 2974 def test_flush_with_non_TextIOWrapper_raises_type_error(self): 2975 with self.assertRaises(TypeError): 2976 _io.TextIOWrapper.flush(5) 2977 2978 def test_flush_with_closed_buffer_raises_value_error(self): 2979 text_io = self._sample() 2980 text_io.close() 2981 self.assertRaisesRegex(ValueError, "closed", text_io.flush) 2982 2983 def test_flush_with_detached_buffer_raises_value_error(self): 2984 text_io = self._sample() 2985 text_io.detach() 2986 self.assertRaisesRegex( 2987 ValueError, "underlying buffer has been detached", text_io.flush 2988 ) 2989 2990 def test_flush_calls_buffer_flush(self): 2991 class C(_io.BytesIO): 2992 flush = Mock(name="flush", return_value=None) 2993 2994 with C(b"hello") as bytes_io: 2995 with _io.TextIOWrapper(bytes_io) as text_io: 2996 bytes_io.flush.assert_not_called() 2997 self.assertIsNone(text_io.flush()) 2998 bytes_io.flush.assert_called_once() 2999 3000 def test_read_with_non_TextIOWrapper_raises_type_error(self): 3001 with self.assertRaises(TypeError): 3002 _io.TextIOWrapper.read(5) 3003 3004 def test_read_with_detached_buffer_raises_value_error(self): 3005 text_io = self._sample() 3006 text_io.detach() 3007 self.assertRaisesRegex( 3008 ValueError, "underlying buffer has been detached", text_io.read 3009 ) 3010 3011 def test_read_with_detached_buffer_and_non_int_size_raises_type_error(self): 3012 text_io = self._sample() 3013 text_io.detach() 3014 with self.assertRaises(TypeError): 3015 text_io.read(5.5) 3016 3017 def test_read_reads_chars(self): 3018 with _io.TextIOWrapper(_io.BytesIO(b"foo bar")) as text_io: 3019 result = text_io.read(3) 3020 self.assertEqual(result, "foo") 3021 self.assertEqual(text_io.read(), " bar") 3022 3023 def test_readline_with_non_TextIOWrapper_raises_type_error(self): 3024 with self.assertRaises(TypeError): 3025 _io.TextIOWrapper.readline(5) 3026 3027 def test_readline_with_closed_file_raises_value_error(self): 3028 text_io = self._sample() 3029 text_io.close() 3030 self.assertRaisesRegex(ValueError, "closed file", text_io.readline) 3031 3032 def test_readline_with_detached_buffer_raises_value_error(self): 3033 text_io = self._sample() 3034 text_io.detach() 3035 self.assertRaisesRegex( 3036 ValueError, "underlying buffer has been detached", text_io.readline 3037 ) 3038 3039 def test_readline_with_non_int_size_raises_type_error(self): 3040 with self._sample() as text_io: 3041 self.assertRaisesRegex( 3042 TypeError, 3043 "cannot be interpreted as an integer", 3044 text_io.readline, 3045 "not_int", 3046 ) 3047 3048 def test_readline_reads_line_terminated_by_newline(self): 3049 with _io.TextIOWrapper(_io.BytesIO(b"foo\nbar")) as text_io: 3050 self.assertEqual(text_io.readline(), "foo\n") 3051 self.assertEqual(text_io.readline(), "bar") 3052 self.assertEqual(text_io.readline(), "") 3053 3054 def test_readline_reads_line_terminated_by_cr(self): 3055 with _io.TextIOWrapper(_io.BytesIO(b"foo\rbar")) as text_io: 3056 self.assertEqual(text_io.readline(), "foo\n") 3057 self.assertEqual(text_io.readline(), "bar") 3058 self.assertEqual(text_io.readline(), "") 3059 3060 def test_readline_reads_line_terminated_by_crlf(self): 3061 with _io.TextIOWrapper(_io.BytesIO(b"foo\r\nbar")) as text_io: 3062 self.assertEqual(text_io.readline(), "foo\n") 3063 self.assertEqual(text_io.readline(), "bar") 3064 self.assertEqual(text_io.readline(), "") 3065 3066 def test_readline_reads_line_with_only_cr(self): 3067 with _io.TextIOWrapper(_io.BytesIO(b"\r")) as text_io: 3068 self.assertEqual(text_io.readline(), "\n") 3069 self.assertEqual(text_io.readline(), "") 3070 3071 def test_tell_with_non_TextIOWrapper_raises_type_error(self): 3072 with self.assertRaises(TypeError): 3073 _io.TextIOWrapper.tell(5) 3074 3075 def test_tell_with_detached_buffer_raises_value_error(self): 3076 text_io = self._sample() 3077 text_io.detach() 3078 self.assertRaisesRegex( 3079 ValueError, "underlying buffer has been detached", text_io.tell 3080 ) 3081 3082 def test_truncate_with_non_TextIOWrapper_raises_type_error(self): 3083 with self.assertRaises(TypeError): 3084 _io.TextIOWrapper.truncate(5, 5) 3085 3086 def test_truncate_with_closed_buffer_raises_value_error(self): 3087 text_io = self._sample() 3088 text_io.close() 3089 self.assertRaisesRegex(ValueError, "closed", text_io.truncate) 3090 3091 def test_truncate_with_detached_buffer_raises_value_error(self): 3092 text_io = self._sample() 3093 text_io.detach() 3094 self.assertRaisesRegex( 3095 ValueError, "underlying buffer has been detached", text_io.truncate 3096 ) 3097 3098 def test_truncate_calls_buffer_flush(self): 3099 class C(_io.BytesIO): 3100 flush = Mock(name="flush") 3101 3102 with C(b"hello") as bytes_io: 3103 with _io.TextIOWrapper(C()) as text_io: 3104 bytes_io.flush.assert_not_called() 3105 text_io.truncate() 3106 bytes_io.flush.assert_called_once() 3107 3108 def test_truncate_calls_buffer_truncate(self): 3109 class C(_io.BytesIO): 3110 truncate = Mock(name="truncate") 3111 3112 with C(b"hello") as bytes_io: 3113 with _io.TextIOWrapper(C()) as text_io: 3114 bytes_io.truncate.assert_not_called() 3115 text_io.truncate() 3116 bytes_io.truncate.assert_called_once() 3117 3118 def test_write_with_non_TextIOWrapper_raises_type_error(self): 3119 with self.assertRaises(TypeError): 3120 _io.TextIOWrapper.write(5, "foo") 3121 3122 def test_write_with_detached_buffer_raises_value_error(self): 3123 text_io = self._sample() 3124 text_io.detach() 3125 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"): 3126 text_io.write("foo") 3127 3128 def test_write_with_detach_buffer_and_non_str_text_raises_type_error(self): 3129 text_io = self._sample() 3130 text_io.detach() 3131 with self.assertRaises(TypeError): 3132 text_io.write(5.5) 3133 3134 def test_write_with_ascii_encoding_and_letters_writes_chars(self): 3135 with self._sample() as text_io: 3136 self.assertEqual(text_io.write("foo"), 3) 3137 text_io.seek(0) 3138 self.assertEqual(text_io.read(), "foolo++") 3139 3140 def test_write_with_ascii_encoding_and_special_characters_writes_chars(self): 3141 with self._sample() as text_io: 3142 self.assertEqual(text_io.write("!?+-()"), 6) 3143 text_io.seek(0) 3144 self.assertEqual(text_io.read(), "!?+-()+") 3145 3146 def test_write_with_utf8_encoding_and_letters_writes_chars(self): 3147 with self._sample_utf8() as text_io: 3148 self.assertEqual(text_io.write("foo"), 3) 3149 text_io.seek(0) 3150 self.assertEqual(text_io.read(), "foolo++") 3151 3152 def test_write_with_utf8_encoding_and_special_characters_writes_chars(self): 3153 with self._sample_utf8() as text_io: 3154 self.assertEqual(text_io.write("(("), 2) 3155 text_io.seek(0) 3156 self.assertEqual(text_io.read(), "((llo++") 3157 3158 def test_write_with_utf8_encoding_and_strict_errors_and_surrogate_raises_unicodeencode_error( 3159 self, 3160 ): 3161 text_io = _io.TextIOWrapper( 3162 _io.BytesIO(b"hello"), encoding="UTF-8", errors="strict" 3163 ) 3164 with self.assertRaises(ValueError): 3165 text_io.write("ab\uD800++") 3166 3167 def test_write_with_utf8_encoding_and_ignore_errors_removes_surrogates(self): 3168 text_io = _io.TextIOWrapper( 3169 _io.BytesIO(b"hello++"), encoding="UTF-8", errors="ignore" 3170 ) 3171 self.assertEqual(text_io.write("ab\uD800a"), 4) 3172 text_io.seek(0) 3173 self.assertEqual(text_io.read(), "abalo++") 3174 3175 def test_write_with_utf8_encoding_and_replace_errors_replaces_surrogates_with_question_mark( 3176 self, 3177 ): 3178 text_io = _io.TextIOWrapper( 3179 _io.BytesIO(b"hello++"), encoding="UTF-8", errors="replace" 3180 ) 3181 self.assertEqual(text_io.write("ab\uD800"), 3) 3182 text_io.seek(0) 3183 self.assertEqual(text_io.read(), "ab?lo++") 3184 3185 # TODO(T61927696): Test write with "surrogatepass" errors when "surrogatepass" is supported by decode 3186 3187 def test_write_with_latin1_encoding_and_letters_writes_chars(self): 3188 with self._sample_latin1() as text_io: 3189 self.assertEqual(text_io.write("foo"), 3) 3190 text_io.seek(0) 3191 self.assertEqual(text_io.read(), "foolo++") 3192 3193 def test_write_with_latin1_encoding_and_special_characters_writes_chars(self): 3194 with self._sample_latin1() as text_io: 3195 self.assertEqual(text_io.write("(("), 2) 3196 text_io.seek(0) 3197 self.assertEqual(text_io.read(), "((llo++") 3198 3199 def test_write_with_bufferedwriter_ascii_encoding_and_letters_returns_length_written( 3200 self, 3201 ): 3202 with _io.TextIOWrapper( 3203 self._sample_buffered_writer(), encoding="ascii" 3204 ) as text_io: 3205 self.assertEqual(text_io.write("foo"), 3) 3206 3207 def test_write_with_bufferedwriter_ascii_encoding_and_special_characters_returns_length_written( 3208 self, 3209 ): 3210 with _io.TextIOWrapper( 3211 self._sample_buffered_writer(), encoding="ascii" 3212 ) as text_io: 3213 self.assertEqual(text_io.write("!?+-()"), 6) 3214 3215 def test_write_with_bufferedwriter_utf8_encoding_and_letters_returns_length_written( 3216 self, 3217 ): 3218 with _io.TextIOWrapper( 3219 self._sample_buffered_writer(), encoding="UTF-8" 3220 ) as text_io: 3221 self.assertEqual(text_io.write("foo"), 3) 3222 3223 def test_write_with_bufferedwriter_utf8_encoding_and_special_characters_returns_length_written( 3224 self, 3225 ): 3226 with _io.TextIOWrapper( 3227 self._sample_buffered_writer(), encoding="UTF-8" 3228 ) as text_io: 3229 self.assertEqual(text_io.write("(("), 2) 3230 3231 def test_write_with_bufferedwriter_utf8_encoding_and_strict_errors_and_surrogate_raises_unicodeencode_error( 3232 self, 3233 ): 3234 with _io.TextIOWrapper( 3235 self._sample_buffered_writer(), encoding="UTF-8", errors="strict" 3236 ) as text_io: 3237 with self.assertRaises(ValueError): 3238 text_io.write("ab\uD800++") 3239 3240 def test_write_with_bufferedwriter_utf8_encoding_and_ignore_errors_returns_length_written( 3241 self, 3242 ): 3243 with _io.TextIOWrapper( 3244 self._sample_buffered_writer(), encoding="UTF-8", errors="ignore" 3245 ) as text_io: 3246 self.assertEqual(text_io.write("ab\uD800a"), 4) 3247 3248 def test_write_with_bufferedwriter_with_utf8_encoding_and_replace_errors_returns_length_written( 3249 self, 3250 ): 3251 with _io.TextIOWrapper( 3252 self._sample_buffered_writer(), encoding="UTF-8", errors="replace" 3253 ) as text_io: 3254 self.assertEqual(text_io.write("ab\uD800"), 3) 3255 3256 # TODO(T61927696): Test write with "surrogatepass" errors when "surrogatepass" is supported by decode 3257 3258 def test_write_with_bufferedwriter_latin1_encoding_and_letters_returns_length_written( 3259 self, 3260 ): 3261 with _io.TextIOWrapper( 3262 self._sample_buffered_writer(), encoding="Latin-1" 3263 ) as text_io: 3264 self.assertEqual(text_io.write("foo"), 3) 3265 3266 def test_write_with_bufferedwriter_latin1_encoding_and_special_characters_returns_length_written( 3267 self, 3268 ): 3269 with _io.TextIOWrapper( 3270 self._sample_buffered_writer(), encoding="Latin-1" 3271 ) as text_io: 3272 self.assertEqual(text_io.write("(("), 2) 3273 3274 3275class StringIOTests(unittest.TestCase): 3276 def test_dunder_init_with_non_str_initial_value_raises_type_error(self): 3277 self.assertRaisesRegex( 3278 TypeError, 3279 "initial_value must be str or None", 3280 _io.StringIO, 3281 initial_value=b"foo", 3282 ) 3283 3284 def test_dunder_init_sets_initial_value(self): 3285 with _io.StringIO(initial_value="foo") as string_io: 3286 self.assertEqual(string_io.getvalue(), "foo") 3287 3288 def test_dunder_init_with_non_str_or_none_newline_raises_value_error(self): 3289 with self.assertRaises(TypeError) as context: 3290 _io.StringIO(newline=1) 3291 self.assertRegex(str(context.exception), "newline must be str or None, not int") 3292 3293 def test_dunder_init_with_illegal_newline_raises_value_error(self): 3294 with self.assertRaises(ValueError) as context: 3295 _io.StringIO(newline="n") 3296 self.assertRegex(str(context.exception), "illegal newline value") 3297 3298 def test_dunder_repr(self): 3299 with _io.StringIO(initial_value="foo") as string_io: 3300 self.assertRegex(string_io.__repr__(), r"<_io.StringIO object at 0x\w+>") 3301 3302 def test_close_with_non_StringIO_self_raises_type_error(self): 3303 with self.assertRaises(TypeError): 3304 _io.StringIO.close(0.5) 3305 3306 def test_close_with_StringIO_subclass_sets_closed_true(self): 3307 class C(_io.StringIO): 3308 pass 3309 3310 sio = C("foo") 3311 3312 self.assertFalse(sio.closed) 3313 sio.close() 3314 self.assertTrue(sio.closed) 3315 3316 def test_detach_raises_unsupported_operation(self): 3317 with _io.StringIO() as string_io: 3318 self.assertRaisesRegex(_io.UnsupportedOperation, "detach", string_io.detach) 3319 3320 def test_encoding_always_returns_none(self): 3321 with _io.StringIO() as string_io: 3322 self.assertIsNone(string_io.encoding) 3323 3324 def test_errors_always_returns_none(self): 3325 with _io.StringIO() as string_io: 3326 self.assertIsNone(string_io.errors) 3327 3328 def test_line_buffering_with_open_returns_false(self): 3329 with _io.StringIO() as string_io: 3330 self.assertFalse(string_io.line_buffering) 3331 3332 def test_line_buffering_with_closed_returns_raises_value_error(self): 3333 string_io = _io.StringIO() 3334 string_io.close() 3335 with self.assertRaises(ValueError) as context: 3336 string_io.line_buffering 3337 self.assertRegex(str(context.exception), "I/O operation on closed file") 3338 3339 def test_getvalue_with_non_stringio_raises_type_error(self): 3340 self.assertRaisesRegex( 3341 TypeError, 3342 r"'getvalue' .* '(_io\.)?StringIO' object.* a 'int'", 3343 _io.StringIO.getvalue, 3344 1, 3345 ) 3346 3347 def test_getvalue_with_open_returns_copy_of_value(self): 3348 strio = _io.StringIO("foobarbaz") 3349 first_value = strio.getvalue() 3350 strio.write("temp") 3351 second_value = strio.getvalue() 3352 self.assertEqual(first_value, "foobarbaz") 3353 self.assertEqual(second_value, "temparbaz") 3354 3355 def test_getvalue_with_closed_raises_value_error(self): 3356 string_io = _io.StringIO() 3357 string_io.close() 3358 with self.assertRaises(ValueError) as context: 3359 string_io.getvalue() 3360 self.assertRegex(str(context.exception), "I/O operation on closed file") 3361 3362 def test_getvalue_subclass_and_closed_attr_returns_value(self): 3363 class Closed(_io.StringIO): 3364 closed = True 3365 3366 closed = Closed("foobar") 3367 self.assertEqual(closed.getvalue(), "foobar") 3368 3369 def test_iter_with_open_returns_self(self): 3370 string_io = _io.StringIO() 3371 self.assertEqual(string_io, string_io.__iter__()) 3372 3373 def test_iter_with_closed_raises_value_error(self): 3374 string_io = _io.StringIO() 3375 string_io.close() 3376 with self.assertRaises(ValueError) as context: 3377 string_io.__iter__() 3378 self.assertRegex(str(context.exception), "I/O operation on closed file") 3379 3380 def test_iter_subclass_and_closed_attr_raises_value_error(self): 3381 class Closed(_io.StringIO): 3382 closed = True 3383 3384 closed = Closed() 3385 with self.assertRaises(ValueError) as context: 3386 closed.__iter__() 3387 self.assertRegex(str(context.exception), "I/O operation on closed file") 3388 3389 def test_next_with_non_stringio_raises_type_error(self): 3390 self.assertRaisesRegex( 3391 TypeError, 3392 r"'__next__' .* '(_io\.)?StringIO' object.* a 'int'", 3393 _io.StringIO.__next__, 3394 1, 3395 ) 3396 3397 def test_next_with_open_reads_line(self): 3398 string_io = _io.StringIO("foo\nbar") 3399 self.assertEqual(string_io.__next__(), "foo\n") 3400 self.assertEqual(string_io.__next__(), "bar") 3401 with self.assertRaises(StopIteration): 3402 string_io.__next__() 3403 3404 def test_next_with_closed_raises_value_error(self): 3405 string_io = _io.StringIO() 3406 string_io.close() 3407 with self.assertRaises(ValueError) as context: 3408 string_io.__next__() 3409 self.assertRegex(str(context.exception), "I/O operation on closed file") 3410 3411 def test_next_subclass_and_closed_attr_reads_line(self): 3412 class Closed(_io.StringIO): 3413 closed = True 3414 3415 closed = Closed("foo\nbar") 3416 self.assertEqual(closed.__next__(), "foo\n") 3417 self.assertEqual(closed.__next__(), "bar") 3418 with self.assertRaises(StopIteration): 3419 closed.__next__() 3420 3421 def test_newline_default(self): 3422 strio = _io.StringIO("a\nb\r\nc\rd") 3423 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"]) 3424 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd") 3425 3426 strio = _io.StringIO() 3427 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8) 3428 strio.seek(0) 3429 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"]) 3430 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd") 3431 3432 def test_newline_none(self): 3433 strio = _io.StringIO("a\nb\r\nc\rd", newline=None) 3434 self.assertEqual(list(strio), ["a\n", "b\n", "c\n", "d"]) 3435 strio.seek(0) 3436 self.assertEqual(strio.read(1), "a") 3437 self.assertEqual(strio.read(2), "\nb") 3438 self.assertEqual(strio.read(2), "\nc") 3439 self.assertEqual(strio.read(1), "\n") 3440 self.assertEqual(strio.getvalue(), "a\nb\nc\nd") 3441 3442 strio = _io.StringIO(newline=None) 3443 self.assertEqual(2, strio.write("a\n")) 3444 self.assertEqual(3, strio.write("b\r\n")) 3445 self.assertEqual(3, strio.write("c\rd")) 3446 strio.seek(0) 3447 self.assertEqual(strio.read(), "a\nb\nc\nd") 3448 self.assertEqual(strio.getvalue(), "a\nb\nc\nd") 3449 3450 strio = _io.StringIO("a\r\nb", newline=None) 3451 self.assertEqual(strio.read(3), "a\nb") 3452 3453 def test_newline_none_with_r_end_not_raise_exception(self): 3454 strio = _io.StringIO("a\nb\r\nc\r", newline=None) 3455 self.assertEqual(list(strio), ["a\n", "b\n", "c\n"]) 3456 3457 def test_newline_empty(self): 3458 strio = _io.StringIO("a\nb\r\nc\rd", newline="") 3459 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\r", "d"]) 3460 strio.seek(0) 3461 self.assertEqual(strio.read(4), "a\nb\r") 3462 self.assertEqual(strio.read(2), "\nc") 3463 self.assertEqual(strio.read(1), "\r") 3464 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd") 3465 3466 strio = _io.StringIO(newline="") 3467 self.assertEqual(2, strio.write("a\n")) 3468 self.assertEqual(2, strio.write("b\r")) 3469 self.assertEqual(2, strio.write("\nc")) 3470 self.assertEqual(2, strio.write("\rd")) 3471 strio.seek(0) 3472 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\r", "d"]) 3473 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd") 3474 3475 def test_newline_lf(self): 3476 strio = _io.StringIO("a\nb\r\nc\rd", newline="\n") 3477 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"]) 3478 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd") 3479 3480 strio = _io.StringIO(newline="\n") 3481 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8) 3482 strio.seek(0) 3483 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"]) 3484 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd") 3485 3486 def test_newline_cr(self): 3487 strio = _io.StringIO("a\nb\r\nc\rd", newline="\r") 3488 self.assertEqual(strio.read(), "a\rb\r\rc\rd") 3489 strio.seek(0) 3490 self.assertEqual(list(strio), ["a\r", "b\r", "\r", "c\r", "d"]) 3491 self.assertEqual(strio.getvalue(), "a\rb\r\rc\rd") 3492 3493 strio = _io.StringIO(newline="\r") 3494 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8) 3495 strio.seek(0) 3496 self.assertEqual(list(strio), ["a\r", "b\r", "\r", "c\r", "d"]) 3497 strio.seek(0) 3498 self.assertEqual(strio.readlines(), ["a\r", "b\r", "\r", "c\r", "d"]) 3499 self.assertEqual(strio.getvalue(), "a\rb\r\rc\rd") 3500 3501 def test_newline_crlf(self): 3502 strio = _io.StringIO("a\nb\r\nc\rd", newline="\r\n") 3503 self.assertEqual(strio.read(), "a\r\nb\r\r\nc\rd") 3504 strio.seek(0) 3505 self.assertEqual(list(strio), ["a\r\n", "b\r\r\n", "c\rd"]) 3506 strio.seek(0) 3507 self.assertEqual(strio.readlines(), ["a\r\n", "b\r\r\n", "c\rd"]) 3508 self.assertEqual(strio.getvalue(), "a\r\nb\r\r\nc\rd") 3509 3510 strio = _io.StringIO(newline="\r\n") 3511 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8) 3512 strio.seek(0) 3513 self.assertEqual(list(strio), ["a\r\n", "b\r\r\n", "c\rd"]) 3514 self.assertEqual(strio.getvalue(), "a\r\nb\r\r\nc\rd") 3515 3516 def test_read_with_open_returns_string(self): 3517 string_io = _io.StringIO("foo\n") 3518 self.assertEqual(string_io.read(), "foo\n") 3519 3520 def test_read_with_closed_raises_value_error(self): 3521 string_io = _io.StringIO() 3522 string_io.close() 3523 with self.assertRaises(ValueError) as context: 3524 string_io.read() 3525 self.assertRegex(str(context.exception), "I/O operation on closed file") 3526 3527 def test_read_with_subclass_and_closed_attr_returns_string(self): 3528 class Closed(_io.StringIO): 3529 closed = True 3530 3531 closed = Closed("foo\n") 3532 self.assertEqual(closed.read(), "foo\n") 3533 3534 def test_read_with_non_int_raises_type_error(self): 3535 string_io = _io.StringIO("hello world") 3536 with self.assertRaises(TypeError): 3537 string_io.read("not-int") 3538 3539 # CPython 3.6 does not do the __index__ call, whereas >= 3.7 does. Since we've 3540 # already implemented this functionality, we should test it. 3541 @pyro_only 3542 def test_read_with_intlike_calls_dunder_index(self): 3543 class IntLike: 3544 def __index__(self): 3545 return 5 3546 3547 string_io = _io.StringIO("hello world") 3548 self.assertEqual(string_io.read(IntLike()), "hello") 3549 3550 def test_read_starts_at_internal_pos(self): 3551 string_io = _io.StringIO("hello world") 3552 string_io.seek(6) 3553 self.assertEqual(string_io.read(), "world") 3554 3555 def test_read_stops_after_passed_in_number_of_bytes(self): 3556 string_io = _io.StringIO("hello world") 3557 self.assertEqual(string_io.read(5), "hello") 3558 3559 def test_read_with_negative_size_returns_remaining_string(self): 3560 string_io = _io.StringIO("hello world") 3561 self.assertEqual(string_io.read(-5), "hello world") 3562 3563 def test_read_with_overseek_returns_empty_string(self): 3564 string_io = _io.StringIO("hello world") 3565 string_io.seek(20) 3566 self.assertEqual(string_io.read(), "") 3567 3568 def test_read_translates_all_newlines_if_newline_is_none(self): 3569 string_io = _io.StringIO("foo\nbar\rbaz\r\n", None) 3570 self.assertEqual(string_io.read(), "foo\nbar\nbaz\n") 3571 3572 def test_read_doesnt_translate_if_newline_is_empty(self): 3573 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "") 3574 self.assertEqual(string_io.read(), "foo\nbar\rbaz\r\n") 3575 3576 def test_read_translates_line_feeds_if_newline_is_string(self): 3577 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "\r\n") 3578 self.assertEqual(string_io.read(), "foo\r\nbar\rbaz\r\r\n") 3579 3580 def test_readline_with_open_returns_string(self): 3581 string_io = _io.StringIO("foo\nbar") 3582 self.assertEqual(string_io.readline(), "foo\n") 3583 3584 def test_readline_with_closed_raises_value_error(self): 3585 string_io = _io.StringIO() 3586 string_io.close() 3587 with self.assertRaises(ValueError) as context: 3588 string_io.readline() 3589 self.assertRegex(str(context.exception), "I/O operation on closed file") 3590 3591 def test_readline_with_subclass_and_closed_attr_returns_string(self): 3592 class Closed(_io.StringIO): 3593 closed = True 3594 3595 closed = Closed("foo\nbar") 3596 self.assertEqual(closed.readline(), "foo\n") 3597 3598 def test_readline_with_non_int_raises_type_error(self): 3599 string_io = _io.StringIO("hello world") 3600 with self.assertRaises(TypeError): 3601 string_io.readline("not-int") 3602 3603 # CPython 3.6 does not do the __index__ call, whereas >= 3.7 does. Since we've 3604 # already implemented this functionality, we should test it. 3605 @pyro_only 3606 def test_readline_with_intlike_calls_dunder_index(self): 3607 class IntLike: 3608 def __index__(self): 3609 return 5 3610 3611 string_io = _io.StringIO("hello world") 3612 self.assertEqual(string_io.readline(IntLike()), "hello") 3613 3614 def test_readline_starts_at_internal_pos(self): 3615 string_io = _io.StringIO("hello world") 3616 string_io.seek(6) 3617 self.assertEqual(string_io.readline(), "world") 3618 3619 def test_readline_stops_after_passed_in_number_of_bytes(self): 3620 string_io = _io.StringIO("hello world\n") 3621 self.assertEqual(string_io.readline(5), "hello") 3622 3623 def test_readline_with_negative_size_full_line(self): 3624 string_io = _io.StringIO("hello world\n") 3625 self.assertEqual(string_io.readline(-5), "hello world\n") 3626 3627 def test_readline_with_overseek_returns_empty_string(self): 3628 string_io = _io.StringIO("hello world") 3629 string_io.seek(20) 3630 self.assertEqual(string_io.readline(), "") 3631 3632 def test_readline_translates_all_newlines_if_newline_is_none(self): 3633 string_io = _io.StringIO("foo\nbar\rbaz\r\n", None) 3634 self.assertEqual(string_io.readline(), "foo\n") 3635 self.assertEqual(string_io.readline(), "bar\n") 3636 self.assertEqual(string_io.readline(), "baz\n") 3637 3638 def test_readline_doesnt_translate_if_newline_is_empty(self): 3639 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "") 3640 self.assertEqual(string_io.readline(), "foo\n") 3641 self.assertEqual(string_io.readline(), "bar\r") 3642 self.assertEqual(string_io.readline(), "baz\r\n") 3643 3644 def test_readline_translates_line_feeds_if_newline_is_string(self): 3645 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "\r\n") 3646 self.assertEqual(string_io.readline(), "foo\r\n") 3647 self.assertEqual(string_io.readline(), "bar\rbaz\r\r\n") 3648 3649 def test_readable_with_open_StringIO_returns_true(self): 3650 string_io = _io.StringIO() 3651 self.assertTrue(string_io.readable()) 3652 3653 def test_readable_with_closed_StringIO_raises_value_error(self): 3654 string_io = _io.StringIO() 3655 string_io.close() 3656 with self.assertRaises(ValueError) as context: 3657 string_io.readable() 3658 self.assertRegex(str(context.exception), "I/O operation on closed file") 3659 3660 def test_seek_with_open_sets_position(self): 3661 string_io = _io.StringIO("foo\n") 3662 self.assertEqual(string_io.seek(2), 2) 3663 self.assertEqual(string_io.tell(), 2) 3664 3665 def test_seek_with_closed_raises_value_error(self): 3666 string_io = _io.StringIO() 3667 string_io.close() 3668 with self.assertRaises(ValueError) as context: 3669 string_io.seek(0) 3670 self.assertRegex(str(context.exception), "I/O operation on closed file") 3671 3672 def test_seek_with_subclass_and_closed_attr_sets_position(self): 3673 class Closed(_io.StringIO): 3674 closed = True 3675 3676 closed = Closed("foo\n") 3677 self.assertEqual(closed.seek(2), 2) 3678 self.assertEqual(closed.tell(), 2) 3679 3680 def test_seek_with_non_int_cookie_raises_type_error(self): 3681 string_io = _io.StringIO("hello world") 3682 with self.assertRaises(TypeError): 3683 string_io.seek("not-int", 0) 3684 3685 def test_seek_with_intlike_cookie_calls_dunder_index(self): 3686 class IntLike: 3687 def __index__(self): 3688 return 5 3689 3690 string_io = _io.StringIO("hello world") 3691 self.assertEqual(string_io.seek(IntLike()), 5) 3692 self.assertEqual(string_io.tell(), 5) 3693 3694 def test_seek_with_non_int_whence_raises_type_error(self): 3695 string_io = _io.StringIO("hello world") 3696 with self.assertRaises(TypeError): 3697 string_io.seek(0, "not-int") 3698 3699 def test_seek_accepts_index_covertible_whence(self): 3700 class IndexLike: 3701 def __index__(self): 3702 return 0 3703 3704 string_io = _io.StringIO("hello world") 3705 self.assertEqual(string_io.seek(1, IndexLike()), 1) 3706 3707 def test_seek_can_overseek(self): 3708 string_io = _io.StringIO("foo\n") 3709 self.assertEqual(string_io.seek(10), 10) 3710 self.assertEqual(string_io.tell(), 10) 3711 3712 def test_seek_with_whence_zero_and_negative_cookie_raises_value_error(self): 3713 string_io = _io.StringIO("foo\n") 3714 with self.assertRaises(ValueError) as context: 3715 string_io.seek(-10) 3716 self.assertRegex(str(context.exception), "Negative seek position -10") 3717 3718 def test_seek_with_whence_one_and_non_zero_cookie_raises_os_error(self): 3719 string_io = _io.StringIO("foo\n") 3720 with self.assertRaises(OSError) as context: 3721 string_io.seek(1, 1) 3722 self.assertRegex(str(context.exception), "Can't do nonzero cur-relative seeks") 3723 3724 def test_seek_with_whence_one_doesnt_change_pos(self): 3725 string_io = _io.StringIO("foo\n") 3726 self.assertEqual(string_io.seek(2), 2) 3727 self.assertEqual(string_io.seek(0, 1), 2) 3728 self.assertEqual(string_io.tell(), 2) 3729 3730 def test_seek_with_whence_two_and_non_zero_cookie_raises_os_error(self): 3731 string_io = _io.StringIO("foo\n") 3732 with self.assertRaises(OSError) as context: 3733 string_io.seek(1, 1) 3734 self.assertRegex(str(context.exception), "Can't do nonzero cur-relative seeks") 3735 3736 def test_seek_with_whence_two_sets_pos_to_end(self): 3737 string_io = _io.StringIO("foo\n") 3738 self.assertEqual(string_io.seek(2), 2) 3739 self.assertEqual(string_io.seek(0, 2), 4) 3740 self.assertEqual(string_io.tell(), 4) 3741 3742 def test_seek_with_invalid_whence_raises_value_error(self): 3743 string_io = _io.StringIO("foo\n") 3744 with self.assertRaises(ValueError) as context: 3745 string_io.seek(1, 3) 3746 self.assertEqual( 3747 str(context.exception), "Invalid whence (3, should be 0, 1 or 2)" 3748 ) 3749 3750 def test_seek_with_bigint_whence_raises_overflow_error(self): 3751 string_io = _io.StringIO("foo\n") 3752 with self.assertRaisesRegex( 3753 OverflowError, r"Python int too large to convert to C .*" 3754 ): 3755 string_io.seek(1, 2 ** 65) 3756 3757 def test_seekable_with_open_StringIO_returns_true(self): 3758 string_io = _io.StringIO() 3759 self.assertTrue(string_io.seekable()) 3760 3761 def test_seekable_with_closed_StringIO_raises_value_error(self): 3762 string_io = _io.StringIO() 3763 string_io.close() 3764 with self.assertRaises(ValueError) as context: 3765 string_io.seekable() 3766 self.assertRegex(str(context.exception), "I/O operation on closed file") 3767 3768 def test_subclass_with_closed_attribute_is_not_closed_for_StringIO(self): 3769 class Closed(_io.StringIO): 3770 closed = True 3771 3772 c = Closed() 3773 self.assertEqual(c.write("foobar"), 6) 3774 self.assertEqual(c.seek(0), 0) 3775 with self.assertRaises(ValueError) as context: 3776 # Since readlines is inherited from `_IOBase which checks for the 3777 # closed property, this method call should raise a ValueError 3778 c.readlines() 3779 self.assertRegex(str(context.exception), "I/O operation on closed file") 3780 3781 def test_supports_arbitrary_attributes(self): 3782 string_io = _io.StringIO() 3783 string_io.here_comes_the_sun = "and I say it's alright" 3784 self.assertEqual(string_io.here_comes_the_sun, "and I say it's alright") 3785 3786 def test_tell_with_open_returns_position(self): 3787 string_io = _io.StringIO() 3788 self.assertEqual(string_io.seek(2), 2) 3789 self.assertEqual(string_io.tell(), 2) 3790 3791 def test_tell_with_closed_raises_value_error(self): 3792 string_io = _io.StringIO() 3793 string_io.close() 3794 with self.assertRaises(ValueError) as context: 3795 string_io.tell() 3796 self.assertRegex(str(context.exception), "I/O operation on closed file") 3797 3798 def test_tell_with_subclass_and_closed_attr_returns_string(self): 3799 class Closed(_io.StringIO): 3800 closed = True 3801 3802 closed = Closed("foo\nbar") 3803 self.assertEqual(closed.tell(), 0) 3804 3805 def test_truncate_with_open_truncates_from_pos(self): 3806 string_io = _io.StringIO("hello world") 3807 string_io.seek(5) 3808 self.assertEqual(string_io.truncate(), 5) 3809 self.assertEqual(string_io.getvalue(), "hello") 3810 3811 def test_truncate_with_closed_raises_value_error(self): 3812 string_io = _io.StringIO() 3813 string_io.close() 3814 with self.assertRaises(ValueError) as context: 3815 string_io.truncate() 3816 self.assertRegex(str(context.exception), "I/O operation on closed file") 3817 3818 def test_truncate_with_subclass_and_closed_attr_returns_string(self): 3819 class Closed(_io.StringIO): 3820 closed = True 3821 3822 closed = Closed("hello world") 3823 closed.seek(5) 3824 self.assertEqual(closed.truncate(), 5) 3825 self.assertEqual(closed.getvalue(), "hello") 3826 3827 def test_truncate_with_non_int_raises_type_error(self): 3828 string_io = _io.StringIO("hello world") 3829 with self.assertRaises(TypeError): 3830 string_io.truncate("not-int") 3831 3832 # CPython 3.6 does not do the __index__ call, whereas >= 3.7 does. Since we've 3833 # already implemented this functionality, we should test it. 3834 @pyro_only 3835 def test_truncate_with_intlike_calls_dunder_index(self): 3836 class IntLike: 3837 def __index__(self): 3838 return 5 3839 3840 string_io = _io.StringIO("hello world") 3841 self.assertEqual(string_io.truncate(IntLike()), 5) 3842 self.assertEqual(string_io.getvalue(), "hello") 3843 3844 def test_truncate_with_negative_int_raises_value_error(self): 3845 string_io = _io.StringIO("hello world") 3846 with self.assertRaises(ValueError) as context: 3847 string_io.truncate(-1) 3848 self.assertRegex(str(context.exception), "Negative size value -1") 3849 3850 def test_truncate_with_size_past_end_of_buffer_returns_size(self): 3851 string_io = _io.StringIO("hello") 3852 self.assertEqual(string_io.truncate(10), 10) 3853 self.assertEqual(string_io.getvalue(), "hello") 3854 3855 def test_write_with_open_writes_to_object(self): 3856 string_io = _io.StringIO() 3857 self.assertEqual(string_io.write("foobar"), 6) 3858 self.assertEqual(string_io.getvalue(), "foobar") 3859 3860 def test_write_with_closed_raises_value_error(self): 3861 strio = _io.StringIO() 3862 self.assertEqual(strio.write("foobar"), 6) 3863 strio.close() 3864 with self.assertRaises(ValueError) as context: 3865 strio.write("foo") 3866 self.assertRegex(str(context.exception), "I/O operation on closed file") 3867 3868 def test_write_with_subclass_and_closed_attr_sets_position(self): 3869 class Closed(_io.StringIO): 3870 closed = True 3871 3872 closed = Closed() 3873 self.assertEqual(closed.write("foobar"), 6) 3874 self.assertEqual(closed.getvalue(), "foobar") 3875 3876 def test_write_with_non_string_raises_type_error(self): 3877 strio = _io.StringIO() 3878 with self.assertRaises(TypeError): 3879 strio.write(1) 3880 3881 def test_write_starts_writing_at_current_position(self): 3882 string_io = _io.StringIO("hello world") 3883 string_io.seek(5) 3884 self.assertEqual(string_io.write("\n"), 1) 3885 self.assertEqual(string_io.getvalue(), "hello\nworld") 3886 3887 def test_write_with_empty_string_does_nothing(self): 3888 string_io = _io.StringIO("hello world") 3889 string_io.seek(5) 3890 self.assertEqual(string_io.write(""), 0) 3891 self.assertEqual(string_io.getvalue(), "hello world") 3892 3893 def test_write_with_larger_string_than_buffer_extends_buffer(self): 3894 string_io = _io.StringIO("hello") 3895 self.assertEqual(string_io.write("hello world"), 11) 3896 self.assertEqual(string_io.getvalue(), "hello world") 3897 3898 def test_write_with_overseek_pads_end_of_buffer_to_position_with_zeros(self): 3899 string_io = _io.StringIO("hello") 3900 string_io.seek(7) 3901 self.assertEqual(string_io.write("world"), 5) 3902 self.assertEqual(string_io.getvalue(), "hello\0\0world") 3903 3904 def test_write_with_write_translate_returns_original_size_of_string(self): 3905 string_io = _io.StringIO(newline=None) 3906 self.assertEqual(string_io.write("baz\r\n"), 5) 3907 self.assertEqual(string_io.getvalue(), "baz\n") 3908 3909 def test_write_with_newline_none_translates_newlines_to_line_feed(self): 3910 string_io = _io.StringIO(newline=None) 3911 self.assertEqual(string_io.write("foo\n"), 4) 3912 self.assertEqual(string_io.write("bar\r"), 4) 3913 self.assertEqual(string_io.write("baz\r\n"), 5) 3914 self.assertEqual(string_io.getvalue(), "foo\nbar\nbaz\n") 3915 3916 def test_write_with_newline_empty_doesnt_do_any_translations(self): 3917 string_io = _io.StringIO(newline="") 3918 self.assertEqual(string_io.write("foo\n"), 4) 3919 self.assertEqual(string_io.write("bar\r"), 4) 3920 self.assertEqual(string_io.write("baz\r\n"), 5) 3921 self.assertEqual(string_io.getvalue(), "foo\nbar\rbaz\r\n") 3922 3923 def test_write_with_string_newline_translates_line_feed_to_string(self): 3924 string_io = _io.StringIO(newline="\r\n") 3925 self.assertEqual(string_io.write("foo\n"), 4) 3926 self.assertEqual(string_io.write("bar\r"), 4) 3927 self.assertEqual(string_io.write("baz\r\n"), 5) 3928 self.assertEqual(string_io.getvalue(), "foo\r\nbar\rbaz\r\r\n") 3929 3930 def test_write_with_newline_none_stores_seen_newlines(self): 3931 string_io = _io.StringIO(newline=None) 3932 self.assertEqual(string_io.newlines, None) 3933 self.assertEqual(string_io.write("foo\n"), 4) 3934 self.assertEqual(string_io.newlines, "\n") 3935 self.assertEqual(string_io.write("bar\r"), 4) 3936 self.assertEqual(string_io.newlines, ("\r", "\n")) 3937 self.assertEqual(string_io.write("baz\r\n"), 5) 3938 self.assertEqual(string_io.newlines, ("\r", "\n", "\r\n")) 3939 self.assertEqual(string_io.getvalue(), "foo\nbar\nbaz\n") 3940 3941 def test_writable_with_open_StringIO_returns_true(self): 3942 string_io = _io.StringIO() 3943 self.assertTrue(string_io.writable()) 3944 3945 def test_writable_with_closed_StringIO_raises_value_error(self): 3946 string_io = _io.StringIO() 3947 string_io.close() 3948 with self.assertRaises(ValueError) as context: 3949 string_io.writable() 3950 self.assertRegex(str(context.exception), "I/O operation on closed file") 3951 3952 3953if __name__ == "__main__": 3954 unittest.main()