this repo has no description
1#!/usr/bin/env python3
2# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com)
3import _io
4import array
5import mmap
6import os
7import subprocess
8import sys
9import tempfile
10import unittest
11import unittest.mock
12from unittest.mock import Mock
13
14from test_support import pyro_only
15
16
17try:
18 from _io import _BufferedIOMixin
19except ImportError:
20 pass
21
22
23def _getfd():
24 r, w = os.pipe()
25 os.close(w) # So that read() is harmless
26 return r
27
28
29class _IOBaseTests(unittest.TestCase):
30 def test_closed_with_closed_instance_returns_true(self):
31 f = _io._IOBase()
32 self.assertFalse(f.closed)
33 f.close()
34 self.assertTrue(f.closed)
35
36 def test_fileno_raises_unsupported_operation(self):
37 f = _io._IOBase()
38 self.assertRaises(_io.UnsupportedOperation, f.fileno)
39
40 def test_flush_with_closed_instance_raises_value_error(self):
41 f = _io._IOBase()
42 f.close()
43 with self.assertRaises(ValueError):
44 f.flush()
45
46 def test_isatty_with_closed_instance_raises_value_error(self):
47 f = _io._IOBase()
48 f.close()
49 with self.assertRaises(ValueError):
50 f.isatty()
51
52 def test_isatty_always_returns_false(self):
53 f = _io._IOBase()
54 self.assertEqual(f.isatty(), False)
55
56 def test_iter_with_closed_instance_raises_value_error(self):
57 f = _io._IOBase()
58 f.close()
59 with self.assertRaises(ValueError):
60 f.__iter__()
61
62 def test_next_reads_line(self):
63 class C(_io._IOBase):
64 def readline(self):
65 return "foo"
66
67 f = C()
68 self.assertEqual(f.__next__(), "foo")
69
70 def test_readline_calls_read(self):
71 class C(_io._IOBase):
72 def read(self, size):
73 raise UserWarning("foo")
74
75 f = C()
76 with self.assertRaises(UserWarning):
77 f.readline()
78
79 def test_readlines_calls_read(self):
80 class C(_io._IOBase):
81 def read(self, size):
82 raise UserWarning("foo")
83
84 f = C()
85 with self.assertRaises(UserWarning):
86 f.readlines()
87
88 def test_readlines_calls_readline(self):
89 class C(_io._IOBase):
90 def readline(self):
91 raise UserWarning("foo")
92
93 f = C()
94 with self.assertRaises(UserWarning):
95 f.readlines()
96
97 def test_readlines_returns_list(self):
98 class C(_io._IOBase):
99 def __iter__(self):
100 return ["x", "y", "z"].__iter__()
101
102 f = C()
103 self.assertEqual(f.readlines(), ["x", "y", "z"])
104
105 def test_readable_always_returns_false(self):
106 f = _io._IOBase()
107 self.assertFalse(f.readable())
108
109 def test_seek_raises_unsupported_operation(self):
110 f = _io._IOBase()
111 self.assertRaises(_io.UnsupportedOperation, f.seek, 0)
112
113 def test_seekable_always_returns_false(self):
114 f = _io._IOBase()
115 self.assertFalse(f.seekable())
116
117 def test_supports_arbitrary_attributes(self):
118 f = _io._IOBase()
119 f.hello_world = 42
120 self.assertEqual(f.hello_world, 42)
121
122 def test_tell_raises_unsupported_operation(self):
123 f = _io._IOBase()
124 self.assertRaises(_io.UnsupportedOperation, f.tell)
125
126 def test_truncate_raises_unsupported_operation(self):
127 f = _io._IOBase()
128 self.assertRaises(_io.UnsupportedOperation, f.truncate)
129
130 def test_writable_always_returns_false(self):
131 f = _io._IOBase()
132 self.assertFalse(f.writable())
133
134 def test_with_with_closed_instance_raises_value_error(self):
135 f = _io._IOBase()
136 f.close()
137 with self.assertRaises(ValueError):
138 with f:
139 pass
140
141 def test_writelines_calls_write(self):
142 class C(_io._IOBase):
143 def write(self, line):
144 raise UserWarning("foo")
145
146 f = C()
147 with self.assertRaises(UserWarning):
148 f.writelines(["a"])
149
150
151class _RawIOBaseTests(unittest.TestCase):
152 def test_read_with_readinto_returning_none_returns_none(self):
153 class C(_io._RawIOBase):
154 def readinto(self, b):
155 return None
156
157 f = C()
158 self.assertEqual(f.read(), None)
159
160 def test_read_with_negative_size_calls_readall(self):
161 class C(_io._RawIOBase):
162 def readall(self):
163 raise UserWarning("foo")
164
165 f = C()
166 with self.assertRaises(UserWarning):
167 f.read(-5)
168
169 def test_read_with_nonnegative_size_calls_readinto(self):
170 class C(_io._RawIOBase):
171 def readinto(self, b):
172 for i in range(len(b)):
173 b[i] = ord("x")
174 return len(b)
175
176 f = C()
177 self.assertEqual(f.read(5), b"xxxxx")
178
179 def test_readall_calls_read(self):
180 class C(_io._RawIOBase):
181 def read(self, size):
182 raise UserWarning("foo")
183
184 f = C()
185 with self.assertRaises(UserWarning):
186 f.readall()
187
188 def test_readinto_raises_not_implemented_error(self):
189 f = _io._RawIOBase()
190 b = bytearray(b"")
191 self.assertRaises(NotImplementedError, f.readinto, b)
192
193 def test_write_raises_not_implemented_error(self):
194 f = _io._RawIOBase()
195 self.assertRaises(NotImplementedError, f.write, b"")
196
197
198class _BufferedIOBaseTests(unittest.TestCase):
199 def test_detach_raises_unsupported_operation(self):
200 f = _io._BufferedIOBase()
201 self.assertIn("detach", _io._BufferedIOBase.__dict__)
202 self.assertRaises(_io.UnsupportedOperation, f.detach)
203
204 def test_read_raises_unsupported_operation(self):
205 f = _io._BufferedIOBase()
206 self.assertIn("read", _io._BufferedIOBase.__dict__)
207 self.assertRaises(_io.UnsupportedOperation, f.read)
208
209 def test_read1_raises_unsupported_operation(self):
210 f = _io._BufferedIOBase()
211 self.assertIn("read1", _io._BufferedIOBase.__dict__)
212 self.assertRaises(_io.UnsupportedOperation, f.read1)
213
214 def test_readinto1_calls_read1(self):
215 class C(_io._BufferedIOBase):
216 def read1(self, n):
217 raise UserWarning("foo")
218
219 f = C()
220 with self.assertRaises(UserWarning):
221 f.readinto1(bytearray())
222
223 def test_readinto_calls_read(self):
224 class C(_io._BufferedIOBase):
225 def read(self, n):
226 raise UserWarning("foo")
227
228 f = C()
229 with self.assertRaises(UserWarning):
230 f.readinto(bytearray())
231
232 def test_write_raises_unsupported_operation(self):
233 f = _io._BufferedIOBase()
234 self.assertIn("write", _io._BufferedIOBase.__dict__)
235 self.assertRaises(_io.UnsupportedOperation, f.write, b"")
236
237
238class BufferedRWPairTests(unittest.TestCase):
239 def test_dunder_init_with_non_readable_reader_raises_unsupported_operation(self):
240 class C:
241 def readable(self):
242 return False
243
244 def writable(self):
245 return False
246
247 with self.assertRaisesRegex(_io.UnsupportedOperation, "readable"):
248 _io.BufferedRWPair(C(), C())
249
250 def test_dunder_init_with_non_writable_writer_raises_unsupported_operation(self):
251 class C:
252 def readable(self):
253 return True
254
255 def writable(self):
256 return False
257
258 with self.assertRaisesRegex(_io.UnsupportedOperation, "writable"):
259 _io.BufferedRWPair(C(), C())
260
261 def test_close_calls_reader_close_and_writer_close(self):
262 class Reader(_io.BytesIO):
263 close = Mock(name="close")
264
265 class Writer(_io.BytesIO):
266 close = Mock(name="close")
267
268 with Reader() as reader, Writer() as writer:
269 with _io.BufferedRWPair(reader, writer) as buffer:
270 buffer.close()
271 reader.close.assert_called_once()
272 writer.close.assert_called_once()
273
274 def test_closed_returns_writer_closed(self):
275 with _io.BytesIO() as reader, _io.BytesIO() as writer:
276 with _io.BufferedRWPair(reader, writer) as buffer:
277 self.assertFalse(buffer.closed)
278 writer.close()
279 self.assertTrue(buffer.closed)
280
281 def test_flush_calls_writer_write(self):
282 write_calls = 0
283
284 class Reader(_io.BytesIO):
285 pass
286
287 class Writer(_io.BytesIO):
288 def write(self, b):
289 nonlocal write_calls
290 write_calls += 1
291 return super().write(b)
292
293 with Reader() as reader, Writer() as writer:
294 with _io.BufferedRWPair(reader, writer) as buffer:
295 buffer.write(b"123")
296 self.assertEqual(write_calls, 0)
297 buffer.flush()
298 self.assertEqual(write_calls, 1)
299
300 def test_isatty_with_tty_reader_returns_true(self):
301 class Reader(_io.BytesIO):
302 isatty = Mock(name="isatty", return_value=True)
303
304 class Writer(_io.BytesIO):
305 isatty = Mock(name="isatty", return_value=False)
306
307 with Reader() as reader, Writer() as writer:
308 with _io.BufferedRWPair(reader, writer) as buffer:
309 self.assertTrue(buffer.isatty())
310 reader.isatty.assert_called_once()
311 writer.isatty.assert_called_once()
312
313 def test_isatty_with_tty_writer_returns_true(self):
314 class Reader(_io.BytesIO):
315 isatty = Mock(name="isatty", return_value=False)
316
317 class Writer(_io.BytesIO):
318 isatty = Mock(name="isatty", return_value=True)
319
320 with Reader() as reader, Writer() as writer:
321 with _io.BufferedRWPair(reader, writer) as buffer:
322 self.assertTrue(buffer.isatty())
323 reader.isatty.assert_not_called()
324 writer.isatty.assert_called_once()
325
326 def test_isatty_with_neither_tty_returns_false(self):
327 class Reader(_io.BytesIO):
328 isatty = Mock(name="isatty", return_value=False)
329
330 class Writer(_io.BytesIO):
331 isatty = Mock(name="isatty", return_value=False)
332
333 with Reader() as reader, Writer() as writer:
334 with _io.BufferedRWPair(reader, writer) as buffer:
335 self.assertFalse(buffer.isatty())
336 reader.isatty.assert_called_once()
337 writer.isatty.assert_called_once()
338
339 def test_peek_does_not_call_reader_or_writer_peek(self):
340 class Reader(_io.BytesIO):
341 peek = Mock(name="peek")
342
343 class Writer(_io.BytesIO):
344 peek = Mock(name="peek")
345
346 with Reader() as reader, Writer() as writer:
347 with _io.BufferedRWPair(reader, writer) as buffer:
348 buffer.peek()
349 reader.peek.assert_not_called()
350 writer.peek.assert_not_called()
351
352 def test_read_calls_reader_readall(self):
353 class Reader(_io.BytesIO):
354 read = Mock(name="read")
355 readall = Mock(name="readall", return_value=None)
356
357 with Reader() as reader, _io.BytesIO() as writer:
358 with _io.BufferedRWPair(reader, writer) as buffer:
359 buffer.read()
360 reader.read.assert_not_called()
361 reader.readall.assert_called_once()
362
363 def test_readable_on_closed_raises_value_error(self):
364 with _io.BytesIO() as reader, _io.BytesIO() as writer:
365 with _io.BufferedRWPair(reader, writer) as buffer:
366 pass
367 with self.assertRaises(ValueError) as context:
368 buffer.readable()
369 self.assertIn("closed file", str(context.exception))
370
371 def test_readable_calls_reader_readable(self):
372 class Reader(_io.BytesIO):
373 readable = Mock(name="readable", return_value=True)
374
375 class Writer(_io.BytesIO):
376 readable = Mock(name="readable")
377
378 with Reader() as reader, Writer() as writer:
379 with _io.BufferedRWPair(reader, writer) as buffer:
380 self.assertEqual(reader.readable.call_count, 2)
381 self.assertTrue(buffer.readable())
382 self.assertEqual(reader.readable.call_count, 3)
383 writer.readable.assert_not_called()
384
385 def test_writable_on_closed_raises_value_error(self):
386 with _io.BytesIO() as reader, _io.BytesIO() as writer:
387 with _io.BufferedRWPair(reader, writer) as buffer:
388 pass
389 with self.assertRaises(ValueError) as context:
390 buffer.writable()
391 self.assertIn("closed file", str(context.exception))
392
393 def test_writable_calls_reader_writable(self):
394 class Reader(_io.BytesIO):
395 writable = Mock(name="writable")
396
397 class Writer(_io.BytesIO):
398 writable = Mock(name="writable", return_value=True)
399
400 with Reader() as reader, Writer() as writer:
401 with _io.BufferedRWPair(reader, writer) as buffer:
402 self.assertEqual(writer.writable.call_count, 2)
403 self.assertTrue(buffer.writable())
404 reader.writable.assert_not_called()
405 self.assertEqual(writer.writable.call_count, 3)
406
407
408class BufferedRandomTests(unittest.TestCase):
409 def test_dunder_init_with_non_seekable_raises_unsupported_operation(self):
410 class C(_io.BytesIO):
411 def seekable(self):
412 return False
413
414 c = C()
415 with self.assertRaisesRegex(_io.UnsupportedOperation, "not seekable"):
416 _io.BufferedRandom(c)
417
418 def test_dunder_init_with_non_readable_raises_unsupported_operation(self):
419 class C(_io.BytesIO):
420 def readable(self):
421 return False
422
423 c = C()
424 with self.assertRaisesRegex(_io.UnsupportedOperation, "not readable"):
425 _io.BufferedRandom(c)
426
427 def test_dunder_init_with_non_writable_raises_unsupported_operation(self):
428 class C(_io.BytesIO):
429 def writable(self):
430 return False
431
432 c = C()
433 with self.assertRaisesRegex(_io.UnsupportedOperation, "not writable"):
434 _io.BufferedRandom(c)
435
436 def test_dunder_init_with_non_positive_buffer_size_raises_value_error(self):
437 with _io.BytesIO(b"hello") as bytes_io:
438 with self.assertRaisesRegex(ValueError, "buffer size"):
439 _io.BufferedRandom(bytes_io, 0)
440
441 def test_dunder_init_allows_subclasses(self):
442 class C(_io.BufferedRandom):
443 pass
444
445 instance = C(_io.BytesIO(b""))
446 self.assertIsInstance(instance, _io.BufferedRandom)
447
448 def test_flush_with_closed_raises_value_error(self):
449 writer = _io.BufferedRandom(_io.BytesIO(b"hello"))
450 writer.close()
451 self.assertRaisesRegex(ValueError, "flush of closed file", writer.flush)
452
453 def test_flush_writes_buffered_bytes(self):
454 with _io.BytesIO(b"Hello world!") as bytes_io:
455 with _io.BufferedRandom(bytes_io) as writer:
456 writer.write(b"foo bar")
457 self.assertEqual(bytes_io.getvalue(), b"Hello world!")
458 writer.flush()
459 self.assertEqual(bytes_io.getvalue(), b"foo barorld!")
460
461 def test_peek_returns_bytes(self):
462 with _io.BytesIO(b"hello") as bytes_io:
463 with _io.BufferedRandom(bytes_io) as buffer:
464 self.assertEqual(buffer.peek(), b"hello")
465
466 def test_peek_with_negative_returns_bytes(self):
467 with _io.BytesIO(b"hello") as bytes_io:
468 with _io.BufferedRandom(bytes_io) as buffer:
469 self.assertEqual(buffer.peek(-1), b"hello")
470
471 def test_raw_returns_raw(self):
472 with _io.BytesIO(b"hello") as bytes_io:
473 with _io.BufferedRandom(bytes_io) as buffered_io:
474 self.assertIs(buffered_io.raw, bytes_io)
475
476 def test_read_with_detached_stream_raises_value_error(self):
477 with _io.BytesIO() as bytes_io:
478 buffered = _io.BufferedRandom(bytes_io)
479 buffered.detach()
480 self.assertRaisesRegex(ValueError, "detached", buffered.read)
481
482 def test_read_with_closed_stream_raises_value_error(self):
483 bytes_io = _io.BytesIO(b"hello")
484 buffered = _io.BufferedRandom(bytes_io)
485 bytes_io.close()
486 self.assertRaisesRegex(ValueError, "closed file", buffered.read)
487
488 def test_read_with_negative_size_raises_value_error(self):
489 with _io.BytesIO() as bytes_io:
490 buffered = _io.BufferedRandom(bytes_io)
491 with self.assertRaisesRegex(ValueError, "read length"):
492 buffered.read(-2)
493
494 def test_read_with_none_size_calls_raw_readall(self):
495 class C(_io.BytesIO):
496 def readall(self):
497 raise UserWarning("foo")
498
499 with C() as bytes_io:
500 buffered = _io.BufferedRandom(bytes_io)
501 with self.assertRaises(UserWarning) as context:
502 buffered.read(None)
503 self.assertEqual(context.exception.args, ("foo",))
504
505 def test_read_reads_bytes(self):
506 with _io.BytesIO(b"hello") as bytes_io:
507 buffered = _io.BufferedRandom(bytes_io, buffer_size=1)
508 result = buffered.read()
509 self.assertEqual(result, b"hello")
510
511 def test_read_reads_count_bytes(self):
512 with _io.BytesIO(b"hello") as bytes_io:
513 buffered = _io.BufferedRandom(bytes_io, buffer_size=1)
514 result = buffered.read(3)
515 self.assertEqual(result, b"hel")
516
517 def test_read1_with_negative_size_returns_rest_in_buffer(self):
518 with _io.BytesIO(b"hello world") as bytes_io:
519 buffered = _io.BufferedRandom(bytes_io, buffer_size=5)
520 self.assertEqual(buffered.read1(-1), b"hello")
521
522 def test_read1_calls_read(self):
523 with _io.BytesIO(b"hello") as bytes_io:
524 buffered = _io.BufferedRandom(bytes_io, buffer_size=10)
525 result = buffered.read1(3)
526 self.assertEqual(result, b"hel")
527
528 def test_read1_reads_from_buffer(self):
529 with _io.BytesIO(b"hello") as bytes_io:
530 buffered = _io.BufferedRandom(bytes_io, buffer_size=4)
531 buffered.read(1)
532 result = buffered.read1(10)
533 self.assertEqual(result, b"ell")
534
535 def test_readable_calls_raw_readable(self):
536 class C(_io.BytesIO):
537 readable = Mock(name="readable", return_value=True)
538
539 with C() as raw:
540 with _io.BufferedRandom(raw) as buffer:
541 self.assertEqual(raw.readable.call_count, 1)
542 self.assertTrue(buffer.readable())
543 self.assertEqual(raw.readable.call_count, 2)
544
545 def test_seek_with_invalid_whence_raises_value_error(self):
546 with _io.BufferedRandom(_io.BytesIO(b"hello")) as writer:
547 with self.assertRaisesRegex(ValueError, "invalid whence"):
548 writer.seek(0, 3)
549
550 def test_seek_calls_raw_seek(self):
551 class C(_io.BytesIO):
552 seek = Mock(name="seek", return_value=42)
553
554 with C(b"hello") as bytes_io:
555 with _io.BufferedRandom(bytes_io) as writer:
556 self.assertEqual(writer.seek(10), 42)
557 bytes_io.seek.assert_called_once()
558
559 def test_seek_resets_position(self):
560 with _io.BytesIO(b"hello") as bytes_io:
561 buffered = _io.BufferedRandom(bytes_io)
562 buffered.read(2)
563 self.assertEqual(buffered.tell(), 2)
564 buffered.seek(0)
565 self.assertEqual(buffered.tell(), 0)
566
567 def test_supports_arbitrary_attributes(self):
568 with _io.BytesIO(b"hello") as bytes_io:
569 buffered = _io.BufferedRandom(bytes_io)
570 buffered.bonjour = -99
571 self.assertEqual(buffered.bonjour, -99)
572
573 def test_tell_returns_current_position(self):
574 with _io.BytesIO(b"hello") as bytes_io:
575 buffered = _io.BufferedRandom(bytes_io)
576 self.assertEqual(buffered.tell(), 0)
577 buffered.read(2)
578 self.assertEqual(buffered.tell(), 2)
579
580 def test_tell_counts_buffered_bytes(self):
581 with _io.BytesIO(b"hello") as bytes_io:
582 with _io.BufferedRandom(bytes_io) as writer:
583 self.assertEqual(bytes_io.tell(), 0)
584 self.assertEqual(writer.tell(), 0)
585 writer.write(b"123")
586 self.assertEqual(bytes_io.tell(), 0)
587 self.assertEqual(writer.tell(), 3)
588
589 def test_truncate_uses_tell_for_default_pos(self):
590 with _io.BytesIO(b"hello") as bytes_io:
591 with _io.BufferedRandom(bytes_io) as writer:
592 writer.write(b"123")
593 self.assertEqual(writer.truncate(), 3)
594 self.assertEqual(bytes_io.getvalue(), b"123")
595
596 def test_truncate_with_pos_truncates_raw(self):
597 with _io.BytesIO(b"hello") as bytes_io:
598 with _io.BufferedRandom(bytes_io) as writer:
599 writer.write(b"123")
600 self.assertEqual(writer.truncate(4), 4)
601 self.assertEqual(bytes_io.getvalue(), b"123l")
602
603 def test_writable_calls_raw_writable(self):
604 class C(_io.BytesIO):
605 writable = Mock(name="writable", return_value=True)
606
607 with C(b"hello") as bytes_io:
608 with _io.BufferedRandom(bytes_io) as writer:
609 bytes_io.writable.assert_called_once()
610 self.assertIs(writer.writable(), True)
611 self.assertEqual(bytes_io.writable.call_count, 2)
612
613 def test_write_with_closed_raises_value_error(self):
614 writer = _io.BufferedRandom(_io.BytesIO(b"hello"))
615 writer.close()
616 with self.assertRaisesRegex(ValueError, "write to closed file"):
617 writer.write(b"")
618
619 def test_write_with_str_raises_type_error(self):
620 with _io.BufferedRandom(_io.BytesIO(b"hello")) as writer:
621 with self.assertRaisesRegex(TypeError, "str"):
622 writer.write("")
623
624
625class BufferedWriterTests(unittest.TestCase):
626 def test_dunder_init_with_non_writable_stream_raises_os_error(self):
627 with _io.FileIO(_getfd(), mode="r") as file_reader:
628 with self.assertRaises(OSError) as context:
629 _io.BufferedWriter(file_reader)
630 self.assertEqual(str(context.exception), "File or stream is not writable.")
631
632 def test_dunder_init_with_non_positive_size_raises_value_error(self):
633 with _io.BytesIO(b"123") as bytes_writer:
634 with self.assertRaises(ValueError) as context:
635 _io.BufferedWriter(bytes_writer, 0)
636 self.assertEqual(
637 str(context.exception), "buffer size must be strictly positive"
638 )
639
640 def test_dunder_init_allows_subclasses(self):
641 class C(_io.BufferedWriter):
642 pass
643
644 instance = C(_io.BytesIO(b""))
645 self.assertIsInstance(instance, _io.BufferedWriter)
646
647 def test_flush_with_closed_raises_value_error(self):
648 writer = _io.BufferedWriter(_io.BytesIO(b"hello"))
649 writer.close()
650 with self.assertRaises(ValueError) as context:
651 writer.flush()
652 self.assertEqual(str(context.exception), "flush of closed file")
653
654 def test_flush_writes_buffered_bytes(self):
655 with _io.BytesIO(b"Hello world!") as bytes_io:
656 with _io.BufferedWriter(bytes_io) as writer:
657 writer.write(b"foo bar")
658 self.assertEqual(bytes_io.getvalue(), b"Hello world!")
659 writer.flush()
660 self.assertEqual(bytes_io.getvalue(), b"foo barorld!")
661
662 def test_raw_returns_raw(self):
663 with _io.BytesIO(b"hello") as bytes_io:
664 with _io.BufferedWriter(bytes_io) as buffered_io:
665 self.assertIs(buffered_io.raw, bytes_io)
666
667 def test_seek_with_invalid_whence_raises_value_error(self):
668 with _io.BufferedWriter(_io.BytesIO(b"hello")) as writer:
669 with self.assertRaisesRegex(ValueError, "invalid whence"):
670 writer.seek(0, 3)
671
672 def test_seek_calls_raw_seek(self):
673 class C(_io.BytesIO):
674 seek = Mock(name="seek", return_value=42)
675
676 with C(b"hello") as bytes_io:
677 with _io.BufferedWriter(bytes_io) as writer:
678 self.assertEqual(writer.seek(10), 42)
679 bytes_io.seek.assert_called_once()
680
681 def test_supports_arbitrary_attributes(self):
682 with _io.BytesIO(b"hello") as bytes_io:
683 with _io.BufferedWriter(bytes_io) as buffered_io:
684 buffered_io.guten_tag = 5.5
685 self.assertEqual(buffered_io.guten_tag, 5.5)
686
687 def test_tell_counts_buffered_bytes(self):
688 with _io.BytesIO(b"hello") as bytes_io:
689 with _io.BufferedWriter(bytes_io) as writer:
690 self.assertEqual(bytes_io.tell(), 0)
691 self.assertEqual(writer.tell(), 0)
692 writer.write(b"123")
693 self.assertEqual(bytes_io.tell(), 0)
694 self.assertEqual(writer.tell(), 3)
695
696 def test_truncate_uses_tell_for_default_pos(self):
697 with _io.BytesIO(b"hello") as bytes_io:
698 with _io.BufferedWriter(bytes_io) as writer:
699 writer.write(b"123")
700 self.assertEqual(writer.truncate(), 3)
701 self.assertEqual(bytes_io.getvalue(), b"123")
702
703 def test_truncate_with_pos_truncates_raw(self):
704 with _io.BytesIO(b"hello") as bytes_io:
705 with _io.BufferedWriter(bytes_io) as writer:
706 writer.write(b"123")
707 self.assertEqual(writer.truncate(4), 4)
708 self.assertEqual(bytes_io.getvalue(), b"123l")
709
710 def test_writable_calls_raw_writable(self):
711 class C(_io.BytesIO):
712 writable = Mock(name="writable", return_value=True)
713
714 with C(b"hello") as bytes_io:
715 with _io.BufferedWriter(bytes_io) as writer:
716 bytes_io.writable.assert_called_once()
717 self.assertIs(writer.writable(), True)
718 self.assertEqual(bytes_io.writable.call_count, 2)
719
720 def test_write_with_closed_raises_value_error(self):
721 writer = _io.BufferedWriter(_io.BytesIO(b"hello"))
722 writer.close()
723 with self.assertRaises(ValueError) as context:
724 writer.write(b"")
725 self.assertEqual(str(context.exception), "write to closed file")
726
727 def test_write_with_str_raises_type_error(self):
728 with _io.BufferedWriter(_io.BytesIO(b"hello")) as writer:
729 with self.assertRaisesRegex(TypeError, "str"):
730 writer.write("")
731
732
733class BytesIOTests(unittest.TestCase):
734 def test_dunder_init_with_none_initial_value_sets_empty_string(self):
735 with _io.BytesIO() as f:
736 self.assertEqual(f.getvalue(), b"")
737
738 def test_dunder_init_with_non_bytes_raises_type_error(self):
739 self.assertRaisesRegex(
740 TypeError,
741 "a bytes-like object is required, not 'str'",
742 _io.BytesIO,
743 "not_bytes",
744 )
745
746 def test_dunder_init_with_bytes_returns_bytesio_instance(self):
747 f = _io.BytesIO(b"foo")
748 self.assertIsInstance(f, _io.BytesIO)
749 self.assertEqual(f.getvalue(), b"foo")
750 f.close()
751
752 def test_dunder_init_with_bytes_subclass_returns_bytesio_instance(self):
753 class C(bytes):
754 pass
755
756 instance = C(b"hello")
757 f = _io.BytesIO(instance)
758 self.assertIsInstance(f, _io.BytesIO)
759 self.assertEqual(f.getvalue(), b"hello")
760
761 def test_dunder_init_with_bytearray_subclass_returns_bytesio_instance(self):
762 class C(bytearray):
763 pass
764
765 instance = C(bytearray(b"hello"))
766 f = _io.BytesIO(instance)
767 self.assertIsInstance(f, _io.BytesIO)
768 self.assertEqual(f.getvalue(), b"hello")
769
770 def test_dunder_init_with_bytearray_returns_bytesio_instance(self):
771 bytes_array = bytearray(b"hello")
772 f = _io.BytesIO(bytes_array)
773 self.assertIsInstance(f, _io.BytesIO)
774 self.assertEqual(f.getvalue(), b"hello")
775
776 def test_dunder_init_with_memoryview_of_bytes_returns_bytesio_instance(self):
777 mem = memoryview(b"hello")
778 f = _io.BytesIO(mem)
779 self.assertIsInstance(f, _io.BytesIO)
780 self.assertEqual(f.getvalue(), b"hello")
781
782 def test_dunder_init_with_memoryview_of_bytearray_returns_bytesio_instance(self):
783 bytes_array = bytearray(b"hello")
784 mem = memoryview(bytes_array)
785 f = _io.BytesIO(mem)
786 self.assertIsInstance(f, _io.BytesIO)
787 self.assertEqual(f.getvalue(), b"hello")
788
789 def test_dunder_init_with_none_returns_empty_bytesio_instance(self):
790 f = _io.BytesIO(None)
791 self.assertEqual(f.getvalue(), b"")
792
793 def test_dunder_init_with_bytes_sets_closed_false(self):
794 f = _io.BytesIO(b"hello")
795 self.assertFalse(f.closed)
796 self.assertTrue(f.readable())
797 self.assertTrue(f.writable())
798 self.assertTrue(f.seekable())
799
800 def test_dunder_init_with_float_raises_type_error(self):
801 with self.assertRaises(TypeError):
802 f = _io.BytesIO(0.5)
803 f.close()
804
805 def test_close_with_non_BytesIO_self_raises_type_error(self):
806 with self.assertRaises(TypeError):
807 _io.BytesIO.close(5)
808
809 def test_close_clears_buffer(self):
810 f = _io.BytesIO(b"foo")
811 f.close()
812 with self.assertRaises(ValueError):
813 f.getvalue()
814
815 def test_close_with_closed_instance_does_not_raise_value_error(self):
816 f = _io.BytesIO(b"foo")
817 f.close()
818 f.close()
819
820 def test_close_with_subclass_overwrite_uses_subclass_property(self):
821 class C(_io.BytesIO):
822 @property
823 def closed(self):
824 return 2
825
826 f = C(b"")
827 self.assertEqual(f.closed, 2)
828
829 def test_getbuffer_with_non_BytesIO_self_raises_type_error(self):
830 with self.assertRaises(TypeError):
831 _io.BytesIO.getbuffer()
832
833 def test_getbuffer_with_closed_raises_value_error(self):
834 f = _io.BytesIO(b"foo")
835 f.close()
836 with self.assertRaises(ValueError):
837 f.getbuffer()
838
839 def test_getvalue_with_non_BytesIO_raises_type_error(self):
840 with self.assertRaises(TypeError):
841 _io.BytesIO.getvalue(5)
842
843 def test_getvalue_with_bytes_returns_bytes_of_buffer(self):
844 f = _io.BytesIO(b"foo")
845 result = f.getvalue()
846 self.assertIsInstance(result, bytes)
847 self.assertEqual(result, b"foo")
848
849 def test_getvalue_with_bytearray_returns_bytes_of_buffer(self):
850 barr = bytearray(b"foo")
851 f = _io.BytesIO(barr)
852 result = f.getvalue()
853 self.assertIsInstance(result, bytes)
854 self.assertEqual(result, b"foo")
855
856 def test_getvalue_with_memoryview_of_bytes_returns_bytes_of_buffer(self):
857 view = memoryview(b"foo")
858 f = _io.BytesIO(view)
859 result = f.getvalue()
860 self.assertIsInstance(result, bytes)
861 self.assertEqual(result, b"foo")
862
863 def test_getvalue_with_memoryview_of_pointer_returns_bytes_of_buffer(self):
864 mm = mmap.mmap(-1, 4)
865 view = memoryview(mm)
866 f = _io.BytesIO(view)
867 result = f.getvalue()
868 self.assertIsInstance(result, bytes)
869 self.assertEqual(result, b"\x00\x00\x00\x00")
870
871 def test_getvalue_with_modification_on_returned_bytes_returns_same_result(self):
872 f = _io.BytesIO(b"foo")
873 result = f.getvalue()
874 self.assertEqual(result, b"foo")
875 result += b"o"
876 self.assertEqual(result, b"fooo")
877 self.assertEqual(f.getvalue(), b"foo")
878
879 def test_getvalue_with_open_returns_copy_of_value(self):
880 f = _io.BytesIO(b"foobarbaz")
881 first_value = f.getvalue()
882 f.write(b"temp")
883 second_value = f.getvalue()
884 self.assertEqual(first_value, b"foobarbaz")
885 self.assertEqual(second_value, b"temparbaz")
886
887 def test_getvalue_with_closed_raises_value_error(self):
888 f = _io.BytesIO(b"hello")
889 f.close()
890 with self.assertRaises(ValueError):
891 f.getvalue()
892
893 def test_getbuffer_with_bytes_returns_bytes_of_buffer(self):
894 f = _io.BytesIO(b"foo")
895 result = f.getbuffer()
896 self.assertIsInstance(result, memoryview)
897
898 def test_read_with_non_BytesIO_self_raises_type_error(self):
899 with self.assertRaises(TypeError):
900 _io.BytesIO.read(5)
901
902 def test_read_one_byte_reads_bytes(self):
903 f = _io.BytesIO(b"foo")
904 self.assertEqual(f.read(1), b"f")
905 self.assertEqual(f.read(1), b"o")
906 self.assertEqual(f.read(1), b"o")
907 self.assertEqual(f.read(1), b"")
908 f.close()
909
910 def test_read_with_bytes_reads_bytes(self):
911 f = _io.BytesIO(b"foo")
912 self.assertEqual(f.read(), b"foo")
913 self.assertEqual(f.read(), b"")
914 f.close()
915
916 def test_read_with_none_size_returns_rest(self):
917 f = _io.BytesIO(b"foo")
918 self.assertEqual(f.read(1), b"f")
919 self.assertEqual(f.read(None), b"oo")
920
921 def test_read_with_closed_raises_value_error(self):
922 f = _io.BytesIO(b"hello")
923 f.close()
924 with self.assertRaises(ValueError):
925 f.read()
926
927 def test_read_with_int_subclass_reads(self):
928 class C(int):
929 pass
930
931 f = _io.BytesIO(b"hello")
932 c = C(3)
933 self.assertEqual(f.read(c), b"hel")
934
935 def test_read_with_int_subclass_does_not_call_dunder_index(self):
936 class C(int):
937 def __index__(self):
938 raise UserWarning("foo")
939
940 f = _io.BytesIO(b"hello")
941 c = C()
942 f.read(c)
943
944 def test_read_with_non_int_raises_type_error(self):
945 f = _io.BytesIO(b"hello")
946 with self.assertRaises(TypeError):
947 f.read(2.5)
948
949 def test_read1_with_non_BytesIO_self_raises_type_error(self):
950 with self.assertRaises(TypeError):
951 _io.BytesIO.read1(5)
952
953 def test_read1_wtih_bytes_reads_bytes(self):
954 f = _io.BytesIO(b"foo")
955 self.assertEqual(f.read1(3), b"foo")
956 self.assertEqual(f.read1(3), b"")
957 f.close()
958
959 def test_read1_with_none_size_returns_rest(self):
960 f = _io.BytesIO(b"foo")
961 self.assertEqual(f.read1(1), b"f")
962 self.assertEqual(f.read1(None), b"oo")
963
964 def test_read1_with_size_not_specified_returns_rest(self):
965 f = _io.BytesIO(b"foo")
966 self.assertEqual(f.read1(1), b"f")
967 self.assertEqual(f.read1(), b"oo")
968 f.close()
969
970 # TODO(T47880928): Test BytesIO.readinto
971 # TODO(T47880928): Test BytesIO.readinto1
972
973 def test_readline_reads_one_line(self):
974 f = _io.BytesIO(b"hello\nworld\nfoo\nbar")
975 self.assertEqual(f.readline(), b"hello\n")
976 self.assertEqual(f.readline(), b"world\n")
977 self.assertEqual(f.readline(), b"foo\n")
978 self.assertEqual(f.readline(), b"bar")
979 self.assertEqual(f.readline(), b"")
980 f.close()
981
982 def test_readlines_reads_all_lines(self):
983 f = _io.BytesIO(b"hello\nworld\nfoo\nbar")
984 self.assertEqual(f.readlines(), [b"hello\n", b"world\n", b"foo\n", b"bar"])
985 self.assertEqual(f.read(), b"")
986 f.close()
987
988 def test_seek_with_non_BytesIO_self_raises_type_error(self):
989 with self.assertRaises(TypeError):
990 _io.BytesIO.seek(5, 5)
991
992 def test_seek_with_closed_raises_value_error(self):
993 f = _io.BytesIO(b"hello")
994 f.close()
995 with self.assertRaises(ValueError):
996 f.seek(3)
997
998 def test_seek_with_none_offset_raises_type_error(self):
999 f = _io.BytesIO(b"hello")
1000 with self.assertRaises(TypeError):
1001 f.seek(None, 0)
1002
1003 def test_seek_with_none_whence_raises_type_error(self):
1004 f = _io.BytesIO(b"hello")
1005 with self.assertRaises(TypeError):
1006 f.seek(1, None)
1007
1008 def test_seek_with_int_subclass_offset_does_not_call_dunder_index(self):
1009 class C(int):
1010 def __index__(self):
1011 raise UserWarning("foo")
1012
1013 f = _io.BytesIO(b"")
1014 pos = C()
1015 f.seek(pos)
1016
1017 def test_seek_with_class_offset_calls_dunder_index(self):
1018 class C:
1019 def __index__(self):
1020 raise UserWarning("foo")
1021
1022 f = _io.BytesIO(b"")
1023 pos = C()
1024 with self.assertRaises(UserWarning):
1025 f.seek(pos)
1026
1027 def test_seek_with_class_whence_calls_dunder_index(self):
1028 class C:
1029 def __index__(self):
1030 raise UserWarning("foo")
1031
1032 f = _io.BytesIO(b"")
1033 whence = C()
1034 with self.assertRaises(UserWarning):
1035 f.seek(0, whence)
1036
1037 def test_seek_with_float_offset_raises_type_error(self):
1038 f = _io.BytesIO(b"")
1039 pos = 3.4
1040 with self.assertRaises(TypeError):
1041 f.seek(pos)
1042
1043 def test_seek_with_float_whence_raises_type_error(self):
1044 f = _io.BytesIO(b"hello")
1045 with self.assertRaises(TypeError):
1046 f.seek(0.5)
1047
1048 def test_seek_with_int_subclass_whence_does_not_call_dunder_index(self):
1049 class C(int):
1050 def __index__(self):
1051 raise UserWarning("foo")
1052
1053 f = _io.BytesIO(b"")
1054 whence = C()
1055 f.seek(5, whence)
1056
1057 def test_seek_with_whence_0_returns_new_pos(self):
1058 f = _io.BytesIO(b"")
1059 pos = 3
1060 self.assertEqual(f.seek(pos), 3)
1061 self.assertEqual(f.tell(), 3)
1062
1063 def test_seek_with_whence_1_returns_new_pos(self):
1064 f = _io.BytesIO(b"")
1065 f.seek(3)
1066 offset = 3
1067 self.assertEqual(f.seek(offset, 1), 6)
1068 self.assertEqual(f.tell(), 6)
1069
1070 def test_seek_with_whence_1_negative_new_pos_retuns_zero(self):
1071 f = _io.BytesIO(b"")
1072 f.seek(3)
1073 offset = -5
1074 self.assertEqual(f.seek(offset, 1), 0)
1075 self.assertEqual(f.tell(), 0)
1076
1077 def test_seek_with_whence_2_returns_new_pos(self):
1078 f = _io.BytesIO(b"hello")
1079 offset = 3
1080 self.assertEqual(f.seek(offset, 2), 8)
1081 self.assertEqual(f.tell(), 8)
1082
1083 def test_seek_with_whence_2_negative_new_pos_retuns_zero(self):
1084 f = _io.BytesIO(b"hello")
1085 offset = -7
1086 self.assertEqual(f.seek(offset, 2), 0)
1087 self.assertEqual(f.tell(), 0)
1088
1089 def test_seek_with_whence_3_raises_value_error(self):
1090 f = _io.BytesIO(b"hello")
1091 with self.assertRaises(ValueError):
1092 f.seek(3, 3)
1093
1094 def test_seek_with_large_whence_raises_overflow_error(self):
1095 f = _io.BytesIO(b"hello")
1096 with self.assertRaises(OverflowError):
1097 f.seek(1, 2 ** 65)
1098
1099 def test_tell_with_non_BytesIO_self_raises_type_error(self):
1100 with self.assertRaises(TypeError):
1101 _io.BytesIO.tell(5)
1102
1103 def test_tell_with_closed_file_raises_value_error(self):
1104 f = _io.BytesIO(b"")
1105 f.close()
1106 with self.assertRaises(ValueError):
1107 f.tell()
1108
1109 def test_tell_returns_position(self):
1110 f = _io.BytesIO(b"foo")
1111 self.assertEqual(f.tell(), 0)
1112 f.read(1)
1113 self.assertEqual(f.tell(), 1)
1114
1115 def test_truncate_with_non_BytesIO_self_raises_type_error(self):
1116 with self.assertRaises(TypeError):
1117 _io.BytesIO.truncate(5, 5)
1118
1119 def test_truncate_with_pos_larger_than_length_returns_pos(self):
1120 f = _io.BytesIO(b"hello")
1121 self.assertEqual(f.truncate(6), 6)
1122 self.assertEqual(f.getvalue(), b"hello")
1123
1124 def test_truncate_with_pos_smaller_than_length_truncates(self):
1125 f = _io.BytesIO(b"hello")
1126 self.assertEqual(f.truncate(3), 3)
1127 self.assertEqual(f.getvalue(), b"hel")
1128
1129 def test_truncate_with_int_subclass_does_not_call_dunder_int(self):
1130 class C(int):
1131 def __int__(self):
1132 raise UserWarning("foo")
1133
1134 f = _io.BytesIO(b"")
1135 pos = C()
1136 f.truncate(pos)
1137
1138 def test_truncate_with_non_int_raises_type_error(self):
1139 dunder_int_called = False
1140
1141 class C:
1142 def __int__(self):
1143 nonlocal dunder_int_called
1144 dunder_int_called = True
1145 raise UserWarning("foo")
1146
1147 f = _io.BytesIO(b"")
1148 pos = C()
1149 with self.assertRaises(TypeError):
1150 f.truncate(pos)
1151 self.assertFalse(dunder_int_called)
1152
1153 def test_truncate_with_raising_dunder_int_raises_type_error(self):
1154 class C:
1155 def __int__(self):
1156 raise UserWarning("foo")
1157
1158 f = _io.BytesIO(b"")
1159 pos = C()
1160 with self.assertRaises(TypeError):
1161 f.truncate(pos)
1162
1163 def test_truncate_with_float_raises_type_error(self):
1164 f = _io.BytesIO(b"")
1165 pos = 3.4
1166 with self.assertRaises(TypeError):
1167 f.truncate(pos)
1168
1169 def test_write_with_non_BytesIO_raises_type_error(self):
1170 with self.assertRaises(TypeError):
1171 _io.BytesIO.write(5, b"foo")
1172
1173 def test_write_with_array_returns_length(self):
1174 bytes_io = _io.BytesIO(b"")
1175 arr = array.array("b", [1, 2, 3])
1176 self.assertEqual(bytes_io.write(arr), 3)
1177 self.assertEqual(bytes_io.getvalue(), b"\x01\x02\x03")
1178
1179 def test_write_with_bytearray_returns_length(self):
1180 bytes_io = _io.BytesIO(b"hello")
1181 bytes_array = bytearray(b"bArr")
1182 self.assertEqual(bytes_io.write(bytes_array), 4)
1183 self.assertEqual(bytes_io.getvalue(), b"bArro")
1184
1185 def test_write_with_bytearray_subclass_returns_length(self):
1186 class C(bytearray):
1187 pass
1188
1189 bytes_io = _io.BytesIO(b"hello")
1190 instance = C(b"bArr")
1191 self.assertEqual(bytes_io.write(instance), 4)
1192 self.assertEqual(bytes_io.getvalue(), b"bArro")
1193
1194 def test_write_with_bytes_returns_length(self):
1195 bytes_io = _io.BytesIO(b"hello")
1196 self.assertEqual(bytes_io.write(b"bytes"), 5)
1197 self.assertEqual(bytes_io.getvalue(), b"bytes")
1198
1199 def test_write_with_bytes_subclass_returns_length(self):
1200 class C(bytes):
1201 pass
1202
1203 bytes_io = _io.BytesIO(b"hello")
1204 instance = C(b"bytes")
1205 self.assertEqual(bytes_io.write(instance), 5)
1206 self.assertEqual(bytes_io.getvalue(), b"bytes")
1207
1208 def test_write_with_memoryview_of_bytearray_returns_length(self):
1209 bytes_io = _io.BytesIO(b"hello")
1210 bytes_array = bytearray(b"bArr")
1211 view = memoryview(bytes_array)
1212 self.assertEqual(bytes_io.write(view), 4)
1213 self.assertEqual(bytes_io.getvalue(), b"bArro")
1214
1215 def test_write_with_memoryview_of_bytes_returns_length(self):
1216 bytes_io = _io.BytesIO(b"hello")
1217 view = memoryview(b"view")
1218 self.assertEqual(bytes_io.write(view), 4)
1219 self.assertEqual(bytes_io.getvalue(), b"viewo")
1220
1221 def test_write_with_memoryview_of_pointer_returns_length(self):
1222 bytes_io = _io.BytesIO(b"")
1223 mm = mmap.mmap(-1, 4)
1224 view = memoryview(mm)
1225 self.assertEqual(bytes_io.write(view), 4)
1226 self.assertEqual(bytes_io.getvalue(), b"\x00\x00\x00\x00")
1227
1228 def test_write_with_non_bytes_like_raises_type_error(self):
1229 bytes_io = _io.BytesIO(b"hello")
1230 with self.assertRaises(TypeError) as context:
1231 bytes_io.write(123)
1232 self.assertRegex(
1233 str(context.exception), "a bytes-like object is required, not 'int'"
1234 )
1235 with self.assertRaises(TypeError) as context:
1236 bytes_io.write("str")
1237 self.assertRegex(
1238 str(context.exception), "a bytes-like object is required, not 'str'"
1239 )
1240
1241 def test_write_with_seek_writes_from_current_position(self):
1242 bytes_io = _io.BytesIO(b"hello")
1243 self.assertEqual(bytes_io.seek(1), 1)
1244 self.assertEqual(bytes_io.write(b"1"), 1)
1245 self.assertEqual(bytes_io.getvalue(), b"h1llo")
1246
1247 def test_write_with_seek_past_end_pads_with_zero(self):
1248 bytes_io = _io.BytesIO(b"hello")
1249 self.assertEqual(bytes_io.seek(7), 7)
1250 self.assertEqual(bytes_io.write(b"world"), 5)
1251 self.assertEqual(bytes_io.getvalue(), b"hello\x00\x00world")
1252
1253 def test_write_with_closed_raises_value_error(self):
1254 bytes_io = _io.BytesIO(b"hello")
1255 bytes_io.close()
1256 with self.assertRaises(ValueError) as context:
1257 bytes_io.write("close")
1258 self.assertRegex(str(context.exception), "I/O operation on closed file")
1259
1260 def test_write_with_empty_does_nothing(self):
1261 bytes_io = _io.BytesIO(b"hello")
1262 self.assertEqual(bytes_io.write(b""), 0)
1263 self.assertEqual(bytes_io.getvalue(), b"hello")
1264
1265 def test_write_with_larger_bytes_extends_buffer(self):
1266 bytes_io = _io.BytesIO(b"hello")
1267 self.assertEqual(bytes_io.write(b"new world"), 9)
1268 self.assertEqual(bytes_io.getvalue(), b"new world")
1269
1270
1271class FileIOTests(unittest.TestCase):
1272 def test_close_closes_file(self):
1273 f = _io.FileIO(_getfd(), closefd=True)
1274 self.assertFalse(f.closed)
1275 f.close()
1276 self.assertTrue(f.closed)
1277
1278 def test_close_twice_does_not_raise(self):
1279 f = _io.FileIO(_getfd(), closefd=True)
1280 f.close()
1281 f.close()
1282
1283 def test_closefd_with_closefd_set_returns_true(self):
1284 with _io.FileIO(_getfd(), closefd=True) as f:
1285 self.assertTrue(f.closefd)
1286
1287 def test_closefd_with_closefd_not_set_returns_true(self):
1288 with _io.FileIO(_getfd(), closefd=False) as f:
1289 self.assertFalse(f.closefd)
1290
1291 def test_dunder_init_with_float_file_raises_type_error(self):
1292 with self.assertRaises(TypeError) as context:
1293 _io.FileIO(3.14)
1294
1295 self.assertEqual(str(context.exception), "integer argument expected, got float")
1296
1297 def test_dunder_init_with_negative_file_descriptor_raises_value_error(self):
1298 with self.assertRaises(ValueError) as context:
1299 _io.FileIO(-5)
1300
1301 self.assertEqual(str(context.exception), "negative file descriptor")
1302
1303 def test_dunder_init_with_non_str_mode_raises_type_error(self):
1304 with self.assertRaises(TypeError):
1305 _io.FileIO(_getfd(), mode=3.14)
1306
1307 def test_dunder_init_with_no_mode_makes_object_readable(self):
1308 with _io.FileIO(_getfd()) as f:
1309 self.assertTrue(f.readable())
1310 self.assertFalse(f.writable())
1311 self.assertFalse(f.seekable())
1312
1313 def test_dunder_init_with_r_in_mode_makes_object_readable(self):
1314 with _io.FileIO(_getfd(), mode="r") as f:
1315 self.assertTrue(f.readable())
1316 self.assertFalse(f.writable())
1317
1318 def test_dunder_init_with_w_in_mode_makes_object_writable(self):
1319 with _io.FileIO(_getfd(), mode="w") as f:
1320 self.assertFalse(f.readable())
1321 self.assertTrue(f.writable())
1322
1323 def test_dunder_init_with_r_plus_in_mode_makes_object_readable_and_writable(self):
1324 with _io.FileIO(_getfd(), mode="r+") as f:
1325 self.assertTrue(f.readable())
1326 self.assertTrue(f.writable())
1327
1328 def test_dunder_init_with_w_plus_in_mode_makes_object_readable_and_writable(self):
1329 with _io.FileIO(_getfd(), mode="w+") as f:
1330 self.assertTrue(f.readable())
1331 self.assertTrue(f.writable())
1332
1333 def test_dunder_init_with_only_plus_in_mode_raises_value_error(self):
1334 with self.assertRaises(ValueError) as context:
1335 _io.FileIO(_getfd(), mode="+")
1336
1337 self.assertEqual(
1338 str(context.exception),
1339 "Must have exactly one of create/read/write/append mode and at "
1340 "most one plus",
1341 )
1342
1343 def test_dunder_init_with_more_than_one_plus_in_mode_raises_value_error(self):
1344 with self.assertRaises(ValueError) as context:
1345 _io.FileIO(_getfd(), mode="r++")
1346
1347 self.assertEqual(
1348 str(context.exception),
1349 "Must have exactly one of create/read/write/append mode and at "
1350 "most one plus",
1351 )
1352
1353 def test_dunder_init_with_rw_in_mode_raises_value_error(self):
1354 with self.assertRaises(ValueError) as context:
1355 _io.FileIO(_getfd(), mode="rw")
1356
1357 self.assertEqual(
1358 str(context.exception),
1359 "Must have exactly one of create/read/write/append mode and at "
1360 "most one plus",
1361 )
1362
1363 def test_dunder_init_with_filename_and_closefd_equals_false_raises_value_error(
1364 self,
1365 ):
1366 with self.assertRaises(ValueError) as context:
1367 _io.FileIO("foobar", closefd=False)
1368
1369 self.assertEqual(
1370 str(context.exception), "Cannot use closefd=False with file name"
1371 )
1372
1373 def test_dunder_init_with_filename_and_opener_calls_opener(self):
1374 def opener(fn, flags):
1375 raise UserWarning("foo")
1376
1377 with self.assertRaises(UserWarning):
1378 _io.FileIO("foobar", opener=opener)
1379
1380 def test_dunder_init_with_opener_returning_non_int_raises_type_error(self):
1381 def opener(fn, flags):
1382 return "not_an_int"
1383
1384 with self.assertRaises(TypeError) as context:
1385 _io.FileIO("foobar", opener=opener)
1386
1387 self.assertEqual(str(context.exception), "expected integer from opener")
1388
1389 def test_dunder_init_with_opener_returning_negative_int_raises_value_error(self):
1390 def opener(fn, flags):
1391 return -1
1392
1393 with self.assertRaises(ValueError) as context:
1394 _io.FileIO("foobar", opener=opener)
1395
1396 self.assertEqual(str(context.exception), "opener returned -1")
1397
1398 # TODO(emacs): Enable side-by-side against CPython when issue38031 is fixed
1399 @pyro_only
1400 def test_dunder_init_with_opener_returning_bad_fd_raises_os_error(self):
1401 with self.assertRaises(OSError):
1402 _io.FileIO("foobar", opener=lambda name, flags: 1000000)
1403
1404 def test_dunder_init_with_directory_raises_is_a_directory_error(self):
1405 with self.assertRaises(IsADirectoryError) as context:
1406 _io.FileIO("/")
1407
1408 self.assertEqual(context.exception.args, (21, "Is a directory"))
1409
1410 def test_dunder_repr_with_closed_file_returns_closed_in_result(self):
1411 f = _io.FileIO(_getfd(), mode="r")
1412 f.close()
1413 self.assertEqual(f.__repr__(), "<_io.FileIO [closed]>")
1414
1415 def test_dunder_repr_with_closefd_and_fd_returns_fd_in_result(self):
1416 fd = _getfd()
1417 with _io.FileIO(fd, mode="r", closefd=True) as f:
1418 self.assertEqual(
1419 f.__repr__(), f"<_io.FileIO name={fd} mode='rb' closefd=True>"
1420 )
1421
1422 def test_dunder_repr_without_closefd_and_fd_returns_fd_in_result(self):
1423 fd = _getfd()
1424 with _io.FileIO(fd, mode="r", closefd=False) as f:
1425 self.assertEqual(
1426 f.__repr__(), f"<_io.FileIO name={fd} mode='rb' closefd=False>"
1427 )
1428
1429 def test_dunder_repr_with_closefd_and_name_returns_name_in_result(self):
1430 def opener(fn, flags):
1431 return os.open(fn, os.O_CREAT | os.O_WRONLY, 0o666)
1432
1433 with tempfile.TemporaryDirectory() as tempdir:
1434 with _io.FileIO(f"{tempdir}/foo", mode="r", opener=opener) as f:
1435 self.assertEqual(
1436 f.__repr__(),
1437 f"<_io.FileIO name='{tempdir}/foo' mode='rb' closefd=True>",
1438 )
1439
1440 def test_fileno_with_closed_file_raises_value_error(self):
1441 f = _io.FileIO(_getfd(), mode="r")
1442 f.close()
1443 self.assertRaises(ValueError, f.fileno)
1444
1445 def test_fileno_returns_int(self):
1446 with _io.FileIO(_getfd(), mode="r") as f:
1447 self.assertIsInstance(f.fileno(), int)
1448
1449 def test_isatty_with_closed_file_raises_value_error(self):
1450 f = _io.FileIO(_getfd(), mode="r")
1451 f.close()
1452 self.assertRaises(ValueError, f.isatty)
1453
1454 def test_isatty_with_non_tty_returns_false(self):
1455 with _io.FileIO(_getfd(), mode="r") as f:
1456 self.assertFalse(f.isatty())
1457
1458 def test_mode_with_created_and_readable_returns_xb_plus(self):
1459 with _io.FileIO(_getfd(), mode="x+") as f:
1460 self.assertEqual(f.mode, "xb+")
1461
1462 def test_mode_with_created_and_non_readable_returns_xb(self):
1463 with _io.FileIO(_getfd(), mode="x") as f:
1464 self.assertEqual(f.mode, "xb")
1465
1466 # Testing appendable/readable is really tricky.
1467
1468 def test_mode_with_readable_and_writable_returns_rb_plus(self):
1469 with _io.FileIO(_getfd(), mode="r+") as f:
1470 self.assertEqual(f.mode, "rb+")
1471
1472 def test_mode_with_readable_and_non_writable_returns_rb(self):
1473 with _io.FileIO(_getfd(), mode="r") as f:
1474 self.assertEqual(f.mode, "rb")
1475
1476 def test_mode_with_writable_returns_wb(self):
1477 with _io.FileIO(_getfd(), mode="w") as f:
1478 self.assertEqual(f.mode, "wb")
1479
1480 def test_read_on_closed_file_raises_value_error(self):
1481 f = _io.FileIO(_getfd(), mode="r")
1482 f.close()
1483 self.assertRaises(ValueError, f.read)
1484
1485 def test_read_on_non_readable_file_raises_unsupported_operation(self):
1486 with _io.FileIO(_getfd(), mode="w") as f:
1487 self.assertRaises(_io.UnsupportedOperation, f.read)
1488
1489 def test_read_returns_bytes(self):
1490 with _io.FileIO(_getfd(), mode="r") as f:
1491 self.assertIsInstance(f.read(), bytes)
1492
1493 def test_readable_with_closed_file_raises_value_error(self):
1494 f = _io.FileIO(_getfd(), mode="r")
1495 f.close()
1496 self.assertRaises(ValueError, f.readable)
1497
1498 def test_readable_with_readable_file_returns_true(self):
1499 with _io.FileIO(_getfd(), mode="r") as f:
1500 self.assertTrue(f.readable())
1501
1502 def test_readable_with_non_readable_file_returns_false(self):
1503 with _io.FileIO(_getfd(), mode="w") as f:
1504 self.assertFalse(f.readable())
1505
1506 def test_readall_with_non_FileIO_self_raises_type_error(self):
1507 with self.assertRaises(TypeError):
1508 _io.FileIO.readall(5)
1509
1510 def test_readall_on_closed_file_raises_value_error(self):
1511 f = _io.FileIO(_getfd(), mode="r")
1512 f.close()
1513 self.assertRaises(ValueError, f.readall)
1514
1515 # TODO(T70611902): Modify the readall and test after CPython fixed this behavior
1516 def test_readall_on_non_readable_file_returns_full_bytes(self):
1517 r, w = os.pipe()
1518 w = os.fdopen(w, "w")
1519 w.write("hello")
1520 w.close()
1521 with _io.FileIO(r, mode="w") as f:
1522 self.assertFalse(f.readable())
1523 self.assertEqual(f.readall(), b"hello")
1524
1525 def test_readall_returns_bytes(self):
1526 with _io.FileIO(_getfd(), mode="r") as f:
1527 self.assertIsInstance(f.readall(), bytes)
1528
1529 def test_readall_returns_all_bytes(self):
1530 r, w = os.pipe()
1531 w = os.fdopen(w, "w")
1532 w.write("hello")
1533 w.close()
1534 with _io.FileIO(r, mode="r") as f:
1535 result = f.readall()
1536 self.assertIsInstance(result, bytes)
1537 self.assertEqual(result, b"hello")
1538
1539 @unittest.skipIf(sys.platform != "linux", "needs /proc filesystem")
1540 def test_readall_on_unseekable_file_reads_all(self):
1541 with open("/proc/self/statm", "rb", buffering=False) as f:
1542 result = f.readall()
1543 self.assertGreater(len(result), 1)
1544
1545 def test_readall_with_more_than_default_buffer_size_returns_all_bytes(self):
1546 r, w = os.pipe()
1547 w = os.fdopen(w, "w")
1548 w.write("foo" * 1000)
1549 w.close()
1550 with _io.FileIO(r, mode="r") as f:
1551 result = f.readall()
1552 self.assertIsInstance(result, bytes)
1553 self.assertEqual(result, b"foo" * 1000)
1554
1555 def test_readall_with_fifo_file_returns_all_bytes(self):
1556 with tempfile.TemporaryDirectory() as tempdir:
1557 fifo = f"{tempdir}/fifo"
1558 os.mkfifo(fifo)
1559 exe = sys.executable
1560 src = f"""
1561import time
1562with open('{fifo}', 'wb') as f:
1563 f.write(b'hello...')
1564 f.flush()
1565 time.sleep(0.5)
1566 f.write(b'bye')
1567"""
1568 p = subprocess.Popen([exe, "-c", src])
1569 with open(fifo, "rb", buffering=False) as f:
1570 result = f.readall()
1571 self.assertEqual(result, b"hello...bye")
1572 p.wait()
1573
1574 # TODO(T53182806): Test FileIO.readinto once memoryview.__setitem__ is done
1575 def test_readinto_with_non_FileIO_self_raises_type_error(self):
1576 with self.assertRaises(TypeError):
1577 _io.FileIO.readinto(5, bytearray(b"hello"))
1578
1579 def test_readinto_with_array_reads_bytes_into_array(self):
1580 r, w = os.pipe()
1581 w = os.fdopen(w, "w")
1582 w.write("hello")
1583 w.close()
1584 arr = array.array("b", [1, 2, 3])
1585 with _io.FileIO(r, mode="r") as f:
1586 self.assertEqual(f.readinto(arr), 3)
1587 self.assertEqual(bytes(arr), b"hel")
1588
1589 def test_readinto_with_bytearray_reads_bytes_into_bytearray(self):
1590 r, w = os.pipe()
1591 w = os.fdopen(w, "w")
1592 w.write("hello")
1593 w.close()
1594 barr = bytearray(b"abc")
1595 with _io.FileIO(r, mode="r") as f:
1596 self.assertEqual(f.readinto(barr), 3)
1597 self.assertEqual(barr, b"hel")
1598
1599 def test_readinto_with_bytes_raises_type_error(self):
1600 r, w = os.pipe()
1601 w = os.fdopen(w, "w")
1602 w.write("hello")
1603 w.close()
1604 b = b"abc"
1605 with _io.FileIO(r, mode="r") as f:
1606 with self.assertRaises(TypeError):
1607 f.readinto(b)
1608
1609 def test_readinto_with_memoryview_of_bytes_raises_type_error(self):
1610 r, w = os.pipe()
1611 w = os.fdopen(w, "w")
1612 w.write("hello")
1613 w.close()
1614 view = memoryview(b"abc")
1615 with _io.FileIO(r, mode="r") as f:
1616 with self.assertRaises(TypeError):
1617 f.readinto(view)
1618
1619 def test_readinto_with_memoryview_of_mmap_reads_bytes_to_pointer(self):
1620 r, w = os.pipe()
1621 w = os.fdopen(w, "w")
1622 w.write("hello")
1623 w.close()
1624 mm = mmap.mmap(-1, 2)
1625 view = memoryview(mm)
1626 with _io.FileIO(r, mode="r") as f:
1627 self.assertEqual(f.readinto(view), 2)
1628 self.assertEqual(bytes(view), b"he")
1629
1630 def test_readinto_with_mmap_reads_bytes_to_memory(self):
1631 r, w = os.pipe()
1632 w = os.fdopen(w, "w")
1633 w.write("hello")
1634 w.close()
1635 mm = mmap.mmap(-1, 2)
1636 with _io.FileIO(r, mode="r") as f:
1637 self.assertEqual(f.readinto(mm), 2)
1638 view = memoryview(mm)
1639 self.assertEqual(bytes(view), b"he")
1640
1641 def test_readinto_with_closed_file_raises_value_error(self):
1642 r, w = os.pipe()
1643 w = os.fdopen(w, "w")
1644 w.write("hello")
1645 w.close()
1646 barr = bytearray(b"abc")
1647 with _io.FileIO(r, mode="r") as f:
1648 f.close()
1649 with self.assertRaises(ValueError):
1650 f.readinto(barr)
1651
1652 def test_readinto_with_empty_bytearray_returns_zero(self):
1653 r, w = os.pipe()
1654 w = os.fdopen(w, "w")
1655 w.write("hello")
1656 w.close()
1657 barr = bytearray(b"")
1658 with _io.FileIO(r, mode="r") as f:
1659 self.assertEqual(f.readinto(barr), 0)
1660 self.assertEqual(barr, b"")
1661
1662 def test_seek_with_float_raises_type_error(self):
1663 with _io.FileIO(_getfd(), mode="r") as f:
1664 self.assertRaises(TypeError, f.seek, 3.4)
1665
1666 def test_seek_with_closed_file_raises_value_error(self):
1667 f = _io.FileIO(_getfd(), mode="r")
1668 f.close()
1669 self.assertRaises(ValueError, f.seek, 3)
1670
1671 def test_seek_with_non_readable_file_raises_os_error(self):
1672 with _io.FileIO(_getfd(), mode="w") as f:
1673 self.assertRaises(OSError, f.seek, 3)
1674
1675 def test_seekable_with_closed_file_raises_value_error(self):
1676 f = _io.FileIO(_getfd(), mode="r")
1677 f.close()
1678 self.assertRaises(ValueError, f.seekable)
1679
1680 def test_seekable_does_not_call_subclass_tell(self):
1681 class C(_io.FileIO):
1682 def tell(self):
1683 raise UserWarning("foo")
1684
1685 with C(_getfd(), mode="w") as f:
1686 self.assertFalse(f.seekable())
1687
1688 def test_seekable_with_non_seekable_file_returns_false(self):
1689 with _io.FileIO(_getfd(), mode="w") as f:
1690 self.assertFalse(f.seekable())
1691
1692 def test_supports_arbitrary_attributes(self):
1693 with _io.FileIO(_getfd(), mode="r") as f:
1694 f.good_morning = 333
1695 self.assertEqual(f.good_morning, 333)
1696
1697 def test_tell_with_closed_file_raises_value_error(self):
1698 f = _io.FileIO(_getfd(), mode="r")
1699 f.close()
1700 self.assertRaises(ValueError, f.tell)
1701
1702 def test_tell_with_non_readable_file_raises_os_error(self):
1703 with _io.FileIO(_getfd(), mode="w") as f:
1704 self.assertRaises(OSError, f.tell)
1705
1706 def test_truncate_with_closed_file_raises_value_error(self):
1707 f = _io.FileIO(_getfd(), mode="w")
1708 f.close()
1709 self.assertRaises(ValueError, f.truncate)
1710
1711 def test_truncate_with_non_writable_file_raises_unsupported_operation(self):
1712 with _io.FileIO(_getfd(), mode="r") as f:
1713 self.assertRaises(_io.UnsupportedOperation, f.truncate)
1714
1715 def test_writable_with_closed_file_raises_value_error(self):
1716 f = _io.FileIO(_getfd(), mode="w")
1717 f.close()
1718 self.assertRaises(ValueError, f.writable)
1719
1720 def test_writable_with_writable_file_returns_true(self):
1721 with _io.FileIO(_getfd(), mode="w") as f:
1722 self.assertTrue(f.writable())
1723
1724 def test_writable_with_non_writable_file_returns_false(self):
1725 with _io.FileIO(_getfd(), mode="r") as f:
1726 self.assertFalse(f.writable())
1727
1728 def test_write_with_non_buffer_raises_type_error(self):
1729 class C:
1730 pass
1731
1732 c = C()
1733 f = _io.FileIO(_getfd(), mode="w")
1734 with self.assertRaises(TypeError) as context:
1735 f.write(c)
1736 self.assertEqual(
1737 str(context.exception), "a bytes-like object is required, not 'C'"
1738 )
1739 f.close()
1740
1741 def test_write_with_raising_dunder_buffer_raises_type_error(self):
1742 class C:
1743 def __buffer__(self):
1744 raise UserWarning("foo")
1745
1746 f = _io.FileIO(_getfd(), mode="w")
1747 c = C()
1748 with self.assertRaises(TypeError) as context:
1749 f.write(c)
1750 self.assertEqual(
1751 str(context.exception), "a bytes-like object is required, not 'C'"
1752 )
1753 f.close()
1754
1755 def test_write_with_bad_dunder_buffer_raises_type_error(self):
1756 class C:
1757 def __buffer__(self):
1758 return "foo"
1759
1760 f = _io.FileIO(_getfd(), mode="w")
1761 c = C()
1762 with self.assertRaises(TypeError) as context:
1763 f.write(c)
1764 self.assertEqual(
1765 str(context.exception), "a bytes-like object is required, not 'C'"
1766 )
1767 f.close()
1768
1769 def test_write_writes_bytes(self):
1770 r, w = os.pipe()
1771 with _io.FileIO(w, mode="w") as f:
1772 f.write(b"hello world")
1773 os.close(r)
1774
1775 def test_write_writes_bytearray(self):
1776 r, w = os.pipe()
1777 with _io.FileIO(w, mode="w") as f:
1778 f.write(bytearray(b"hello world"))
1779 os.close(r)
1780
1781
1782@pyro_only
1783class FspathTests(unittest.TestCase):
1784 def test_fspath_with_str_returns_str(self):
1785 result = "hello"
1786 self.assertIs(_io._fspath(result), result)
1787
1788 def test_fspath_with_str_subclass_returns_str(self):
1789 class C(str):
1790 pass
1791
1792 result = C("hello")
1793 self.assertIs(_io._fspath(result), result)
1794
1795 def test_fspath_with_str_subclass_does_not_call_dunder_fspath(self):
1796 class C(str):
1797 def __fspath__(self):
1798 raise UserWarning("foo")
1799
1800 result = C("hello")
1801 self.assertIs(_io._fspath(result), result)
1802
1803 def test_fspath_with_bytes_returns_bytes(self):
1804 result = b"hello"
1805 self.assertIs(_io._fspath(result), result)
1806
1807 def test_fspath_with_bytes_subclass_returns_bytes(self):
1808 class C(bytes):
1809 pass
1810
1811 result = C(b"hello")
1812 self.assertIs(_io._fspath(result), result)
1813
1814 def test_fspath_with_bytes_subclass_does_not_call_dunder_fspath(self):
1815 class C(bytes):
1816 def __fspath__(self):
1817 raise UserWarning("foo")
1818
1819 result = C(b"hello")
1820 self.assertIs(_io._fspath(result), result)
1821
1822 def test_fspath_with_no_dunder_fspath_raises_type_error(self):
1823 class C:
1824 pass
1825
1826 result = C()
1827 self.assertRaises(TypeError, _io._fspath, result)
1828
1829 def test_fspath_calls_dunder_fspath_returns_str(self):
1830 class C:
1831 def __fspath__(self):
1832 return "foo"
1833
1834 self.assertEqual(_io._fspath(C()), "foo")
1835
1836 def test_fspath_calls_dunder_fspath_returns_bytes(self):
1837 class C:
1838 def __fspath__(self):
1839 return b"foo"
1840
1841 self.assertEqual(_io._fspath(C()), b"foo")
1842
1843 def test_fspath_with_dunder_fspath_returning_non_str_raises_type_error(self):
1844 class C:
1845 def __fspath__(self):
1846 return 5
1847
1848 result = C()
1849 self.assertRaises(TypeError, _io._fspath, result)
1850
1851
1852class OpenTests(unittest.TestCase):
1853 def test_open_with_bad_mode_raises_type_error(self):
1854 with self.assertRaisesRegex(
1855 TypeError, r"open\(\) argument .* must be str, not int"
1856 ):
1857 _io.open(0, mode=5)
1858
1859 def test_open_with_bad_buffering_raises_type_error(self):
1860 with self.assertRaises(TypeError) as context:
1861 _io.open(0, buffering=None)
1862
1863 self.assertEqual(
1864 str(context.exception), "an integer is required (got type NoneType)"
1865 )
1866
1867 def test_open_with_bad_encoding_raises_type_error(self):
1868 with self.assertRaisesRegex(
1869 TypeError, r"open\(\) argument .* must be str or None, not int"
1870 ):
1871 _io.open(0, encoding=5)
1872
1873 def test_open_with_bad_errors_raises_type_error(self):
1874 with self.assertRaisesRegex(
1875 TypeError, r"open\(\) argument .* must be str or None, not int"
1876 ):
1877 _io.open(0, errors=5)
1878
1879 def test_open_with_duplicate_mode_raises_value_error(self):
1880 with self.assertRaises(ValueError) as context:
1881 _io.open(0, mode="rr")
1882
1883 self.assertEqual(str(context.exception), "invalid mode: 'rr'")
1884
1885 def test_open_with_bad_mode_character_raises_value_error(self):
1886 with self.assertRaises(ValueError) as context:
1887 _io.open(0, mode="q")
1888
1889 self.assertEqual(str(context.exception), "invalid mode: 'q'")
1890
1891 def test_open_with_U_and_writing_raises_value_error(self):
1892 with self.assertRaises(ValueError) as context:
1893 _io.open(0, mode="Uw")
1894
1895 self.assertRegex(str(context.exception), "mode U cannot be combined with")
1896
1897 def test_open_with_U_and_appending_raises_value_error(self):
1898 with self.assertRaises(ValueError) as context:
1899 _io.open(0, mode="Ua")
1900
1901 self.assertRegex(str(context.exception), "mode U cannot be combined with")
1902
1903 def test_open_with_U_and_updating_raises_value_error(self):
1904 with self.assertRaises(ValueError) as context:
1905 _io.open(0, mode="U+")
1906
1907 self.assertRegex(str(context.exception), "mode U cannot be combined with")
1908
1909 def test_open_with_text_and_binary_raises_value_error(self):
1910 with self.assertRaises(ValueError) as context:
1911 _io.open(0, mode="tb")
1912
1913 self.assertEqual(
1914 str(context.exception), "can't have text and binary mode at once"
1915 )
1916
1917 def test_open_with_creating_and_reading_raises_value_error(self):
1918 with self.assertRaises(ValueError) as context:
1919 _io.open(0, mode="rx")
1920
1921 self.assertEqual(
1922 str(context.exception),
1923 "must have exactly one of create/read/write/append mode",
1924 )
1925
1926 def test_open_with_creating_and_writing_raises_value_error(self):
1927 with self.assertRaises(ValueError) as context:
1928 _io.open(0, mode="wx")
1929
1930 self.assertEqual(
1931 str(context.exception),
1932 "must have exactly one of create/read/write/append mode",
1933 )
1934
1935 def test_open_with_creating_and_appending_raises_value_error(self):
1936 with self.assertRaises(ValueError) as context:
1937 _io.open(0, mode="ax")
1938
1939 self.assertEqual(
1940 str(context.exception),
1941 "must have exactly one of create/read/write/append mode",
1942 )
1943
1944 def test_open_with_reading_and_writing_raises_value_error(self):
1945 with self.assertRaises(ValueError) as context:
1946 _io.open(0, mode="rw")
1947
1948 self.assertEqual(
1949 str(context.exception),
1950 "must have exactly one of create/read/write/append mode",
1951 )
1952
1953 def test_open_with_reading_and_appending_raises_value_error(self):
1954 with self.assertRaises(ValueError) as context:
1955 _io.open(0, mode="ra")
1956
1957 self.assertEqual(
1958 str(context.exception),
1959 "must have exactly one of create/read/write/append mode",
1960 )
1961
1962 def test_open_with_writing_and_appending_raises_value_error(self):
1963 with self.assertRaises(ValueError) as context:
1964 _io.open(0, mode="wa")
1965
1966 self.assertEqual(
1967 str(context.exception),
1968 "must have exactly one of create/read/write/append mode",
1969 )
1970
1971 def test_open_with_no_reading_creating_writing_appending_raises_value_error(self):
1972 with self.assertRaises(ValueError) as context:
1973 _io.open(0, mode="")
1974
1975 self.assertEqual(
1976 str(context.exception),
1977 "Must have exactly one of create/read/write/append mode and at "
1978 "most one plus",
1979 )
1980
1981 def test_open_with_binary_and_encoding_raises_value_error(self):
1982 with self.assertRaises(ValueError) as context:
1983 _io.open(0, mode="rb", encoding="utf-8")
1984
1985 self.assertEqual(
1986 str(context.exception), "binary mode doesn't take an encoding argument"
1987 )
1988
1989 def test_open_with_binary_and_errors_raises_value_error(self):
1990 with self.assertRaises(ValueError) as context:
1991 _io.open(0, mode="rb", errors="error")
1992
1993 self.assertEqual(
1994 str(context.exception), "binary mode doesn't take an errors argument"
1995 )
1996
1997 def test_open_with_binary_and_newline_raises_value_error(self):
1998 with self.assertRaises(ValueError) as context:
1999 _io.open(0, mode="rb", newline="nl")
2000
2001 self.assertEqual(
2002 str(context.exception), "binary mode doesn't take a newline argument"
2003 )
2004
2005 def test_open_with_no_buffering_and_binary_returns_file_io(self):
2006 fd = _getfd()
2007 with _io.open(fd, buffering=False, mode="rb") as result:
2008 self.assertIsInstance(result, _io.FileIO)
2009
2010 @unittest.skipIf(sys.platform == "darwin", "Darwin rejects non-utf8 filenames")
2011 def test_open_non_utf8_filename(self):
2012 with tempfile.TemporaryDirectory() as tempdir:
2013 filename = "bad_surrogate\udc80.txt"
2014 full_path = os.path.join(tempdir, filename)
2015 with open(full_path, "w") as fp:
2016 fp.write("foobar")
2017 filename_encoded = filename.encode(
2018 sys.getfilesystemencoding(), sys.getfilesystemencodeerrors()
2019 )
2020 tempdir_bytes = tempdir.encode(
2021 sys.getfilesystemencoding(), sys.getfilesystemencodeerrors()
2022 )
2023 files = os.listdir(tempdir_bytes)
2024 self.assertIn(filename_encoded, files)
2025
2026 with open(full_path, "r") as fp:
2027 self.assertEqual(fp.read(), "foobar")
2028
2029 def test_open_code_returns_buffered_reader(self):
2030 with tempfile.TemporaryDirectory() as tempdir:
2031 filename = "temp.txt"
2032 full_path = os.path.join(tempdir, filename)
2033
2034 with open(full_path, "w") as fp:
2035 fp.write("foobar")
2036
2037 with _io.open_code(full_path) as result:
2038 self.assertIsInstance(result, _io.BufferedReader)
2039
2040
2041@pyro_only
2042class UnderBufferedIOMixinTests(unittest.TestCase):
2043 def test_dunder_init_sets_raw(self):
2044 with _io.FileIO(_getfd(), closefd=True) as f:
2045 result = _BufferedIOMixin(f)
2046 self.assertIs(result.raw, f)
2047
2048 def test_dunder_repr_returns_str(self):
2049 result = _BufferedIOMixin(None)
2050 self.assertEqual(result.__repr__(), "<_io._BufferedIOMixin>")
2051
2052 def test_dunder_repr_gets_raw_name(self):
2053 class C:
2054 name = "foo"
2055
2056 result = _BufferedIOMixin(C())
2057 self.assertEqual(result.__repr__(), "<_io._BufferedIOMixin name='foo'>")
2058
2059 def test_close_with_none_raw_raises_value_error(self):
2060 result = _BufferedIOMixin(None)
2061 self.assertRaisesRegex(ValueError, "raw stream has been detached", result.close)
2062
2063 def test_close_with_closed_raw_does_nothing(self):
2064 class C:
2065 closed = True
2066
2067 def close(self):
2068 pass
2069
2070 result = _BufferedIOMixin(C())
2071 self.assertIsNone(result.close())
2072
2073 def test_close_calls_raw_flush(self):
2074 # TODO(T53510135): Use unittest.mock
2075 class C:
2076 closed = False
2077
2078 def flush(self):
2079 raise UserWarning("foo")
2080
2081 def close(self):
2082 pass
2083
2084 result = _BufferedIOMixin(C())
2085 self.assertRaises(UserWarning, result.close)
2086
2087 def test_close_calls_raw_close(self):
2088 # TODO(T53510135): Use unittest.mock
2089 class C:
2090 closed = False
2091
2092 def close(self):
2093 raise UserWarning("foo")
2094
2095 result = _BufferedIOMixin(C())
2096 self.assertRaises(UserWarning, result.close)
2097
2098 def test_closed_with_none_raw_raises_value_error(self):
2099 result = _BufferedIOMixin(None)
2100 with self.assertRaisesRegex(ValueError, "raw stream has been detached"):
2101 result.closed
2102
2103 def test_closed_calls_raw_closed(self):
2104 # TODO(T53510135): Use unittest.mock
2105 class C:
2106 @property
2107 def closed(self):
2108 raise UserWarning("foo")
2109
2110 result = _BufferedIOMixin(C())
2111 with self.assertRaises(UserWarning):
2112 result.closed
2113
2114 def test_detach_with_none_raw_raises_value_error(self):
2115 result = _BufferedIOMixin(None)
2116 self.assertRaisesRegex(
2117 ValueError, "raw stream has been detached", result.detach
2118 )
2119
2120 def test_detach_calls_raw_flush(self):
2121 # TODO(T53510135): Use unittest.mock
2122 class C:
2123 closed = False
2124
2125 def flush(self):
2126 raise UserWarning("foo")
2127
2128 result = _BufferedIOMixin(C())
2129 self.assertRaises(UserWarning, result.detach)
2130
2131 def test_detach_returns_raw_and_sets_none(self):
2132 with _io.BytesIO() as raw:
2133 result = _BufferedIOMixin(raw)
2134 self.assertIs(result.detach(), raw)
2135 self.assertIs(result.raw, None)
2136
2137 def test_fileno_with_none_raw_raises_value_error(self):
2138 result = _BufferedIOMixin(None)
2139 self.assertRaisesRegex(
2140 ValueError, "raw stream has been detached", result.fileno
2141 )
2142
2143 def test_fileno_calls_raw_fileno(self):
2144 # TODO(T53510135): Use unittest.mock
2145 class C:
2146 closed = False
2147
2148 def fileno(self):
2149 raise UserWarning("foo")
2150
2151 result = _BufferedIOMixin(C())
2152 self.assertRaises(UserWarning, result.fileno)
2153
2154 def test_flush_with_none_raw_raises_value_error(self):
2155 result = _BufferedIOMixin(None)
2156 self.assertRaisesRegex(ValueError, "raw stream has been detached", result.flush)
2157
2158 def test_flush_with_closed_raw_raises_value_error(self):
2159 class C:
2160 closed = True
2161
2162 def close(self):
2163 pass
2164
2165 result = _BufferedIOMixin(C())
2166 self.assertRaises(ValueError, result.flush)
2167
2168 def test_flush_calls_raw_flush(self):
2169 # TODO(T53510135): Use unittest.mock
2170 class C:
2171 closed = False
2172
2173 def flush(self):
2174 raise UserWarning("foo")
2175
2176 result = _BufferedIOMixin(C())
2177 self.assertRaises(UserWarning, result.flush)
2178
2179 def test_isatty_calls_raw_isatty(self):
2180 # TODO(T53510135): Use unittest.mock
2181 class C:
2182 closed = False
2183
2184 def isatty(self):
2185 raise UserWarning("foo")
2186
2187 result = _BufferedIOMixin(C())
2188 self.assertRaises(UserWarning, result.isatty)
2189
2190 def test_isatty_with_none_raw_raises_value_error(self):
2191 result = _BufferedIOMixin(None)
2192 self.assertRaisesRegex(
2193 ValueError, "raw stream has been detached", result.isatty
2194 )
2195
2196 def test_mode_calls_raw_mode(self):
2197 # TODO(T53510135): Use unittest.mock
2198 class C:
2199 @property
2200 def mode(self):
2201 raise UserWarning("foo")
2202
2203 result = _BufferedIOMixin(C())
2204 with self.assertRaises(UserWarning):
2205 result.mode
2206
2207 def test_name_calls_raw_name(self):
2208 # TODO(T53510135): Use unittest.mock
2209 class C:
2210 @property
2211 def name(self):
2212 raise UserWarning("foo")
2213
2214 result = _BufferedIOMixin(C())
2215 with self.assertRaises(UserWarning):
2216 result.name
2217
2218 def test_seek_with_none_raw_raises_value_error(self):
2219 result = _BufferedIOMixin(None)
2220 with self.assertRaisesRegex(ValueError, "raw stream has been detached"):
2221 result.seek(0)
2222
2223 def test_tell_with_none_raw_raises_value_error(self):
2224 result = _BufferedIOMixin(None)
2225 self.assertRaisesRegex(ValueError, "raw stream has been detached", result.tell)
2226
2227 def test_tell_calls_raw_tell(self):
2228 # TODO(T53510135): Use unittest.mock
2229 class C:
2230 def tell(self):
2231 raise UserWarning("foo")
2232
2233 result = _BufferedIOMixin(C())
2234 self.assertRaises(UserWarning, result.tell)
2235
2236 def test_tell_with_raw_tell_returning_negative_raises_os_error(self):
2237 class C:
2238 def tell(self):
2239 return -1
2240
2241 result = _BufferedIOMixin(C())
2242 self.assertRaises(OSError, result.tell)
2243
2244 def test_tell_returns_result_of_raw_tell(self):
2245 class C:
2246 def tell(self):
2247 return 5
2248
2249 result = _BufferedIOMixin(C())
2250 self.assertEqual(result.tell(), 5)
2251
2252 def test_truncate_calls_raw_flush(self):
2253 # TODO(T53510135): Use unittest.mock
2254 class C:
2255 closed = False
2256
2257 def flush(self):
2258 raise UserWarning("foo")
2259
2260 result = _BufferedIOMixin(C())
2261 self.assertRaises(UserWarning, result.truncate)
2262
2263 def test_truncate_with_none_pos_calls_tell(self):
2264 # TODO(T53510135): Use unittest.mock
2265 class C:
2266 closed = False
2267
2268 def flush(self):
2269 pass
2270
2271 def tell(self):
2272 raise UserWarning("foo")
2273
2274 result = _BufferedIOMixin(C())
2275 self.assertRaises(UserWarning, result.truncate)
2276
2277 def test_truncate_with_none_pos_calls_raw_truncate_with_pos(self):
2278 # TODO(T53510135): Use unittest.mock
2279 class C:
2280 closed = False
2281
2282 def flush(self):
2283 pass
2284
2285 def tell(self):
2286 return 5
2287
2288 def truncate(self, pos):
2289 raise UserWarning(pos)
2290
2291 result = _BufferedIOMixin(C())
2292 with self.assertRaises(UserWarning) as context:
2293 result.truncate()
2294 self.assertEqual(context.exception.args, (5,))
2295
2296 def test_truncate_calls_raw_truncate(self):
2297 # TODO(T53510135): Use unittest.mock
2298 class C:
2299 closed = False
2300
2301 def flush(self):
2302 pass
2303
2304 def tell(self):
2305 raise MemoryError("foo")
2306
2307 def truncate(self, pos):
2308 raise UserWarning(pos)
2309
2310 result = _BufferedIOMixin(C())
2311 with self.assertRaises(UserWarning) as context:
2312 result.truncate(10)
2313 self.assertEqual(context.exception.args, (10,))
2314
2315 def test_seek_calls_raw_seek(self):
2316 # TODO(T53510135): Use unittest.mock
2317 class C:
2318 def seek(self, pos, whence):
2319 raise UserWarning(pos, whence)
2320
2321 result = _BufferedIOMixin(C())
2322 self.assertRaises(UserWarning, result.seek, 5, 0)
2323
2324 def tes_seek_returning_negative_pos_raises_os_error(self):
2325 class C:
2326 def seek(self, pos, whence):
2327 return -1
2328
2329 result = _BufferedIOMixin(C())
2330 self.assertRaises(OSError, result.seek, 5, 0)
2331
2332 def test_seek_returns_result_of_raw_seek(self):
2333 class C:
2334 def seek(self, pos, whence):
2335 return 100
2336
2337 result = _BufferedIOMixin(C())
2338 self.assertEqual(result.seek(5, 0), 100)
2339
2340 def test_seekable_calls_raw_seekable(self):
2341 # TODO(T53510135): Use unittest.mock
2342 class C:
2343 closed = False
2344
2345 def seekable(self):
2346 raise UserWarning("foo")
2347
2348 result = _BufferedIOMixin(C())
2349 self.assertRaises(UserWarning, result.seekable)
2350
2351
2352class IncrementalNewlineDecoderTests(unittest.TestCase):
2353 def test_dunder_init_with_non_int_translate_raises_type_error(self):
2354 with self.assertRaises(TypeError) as context:
2355 _io.IncrementalNewlineDecoder(None, translate="foo")
2356 self.assertEqual(
2357 str(context.exception), "an integer is required (got type str)"
2358 )
2359
2360 def test_decode_with_none_decoder_uses_input(self):
2361 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1)
2362 self.assertEqual(decoder.decode("bar"), "bar")
2363
2364 def test_decode_with_non_int_final_raises_type_error(self):
2365 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1)
2366 with self.assertRaises(TypeError) as context:
2367 decoder.decode("bar", final="foo")
2368 self.assertEqual(
2369 str(context.exception), "an integer is required (got type str)"
2370 )
2371
2372 def test_decode_with_decoder_calls_decoder_on_input(self):
2373 class C:
2374 def decode(self, input, final):
2375 raise UserWarning(input, final)
2376
2377 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1)
2378 with self.assertRaises(UserWarning) as context:
2379 decoder.decode("foo", 5)
2380 self.assertEqual(context.exception.args, ("foo", True))
2381
2382 def test_decode_with_no_newlines_returns_input(self):
2383 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1)
2384 self.assertEqual(decoder.decode("bar"), "bar")
2385 self.assertEqual(decoder.newlines, None)
2386 self.assertEqual(decoder.getstate(), (b"", 0))
2387
2388 def test_decode_with_carriage_return_does_not_translate_to_newline(self):
2389 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=0)
2390 self.assertEqual(decoder.decode("bar\r\n"), "bar\r\n")
2391 self.assertEqual(decoder.newlines, "\r\n")
2392 self.assertEqual(decoder.getstate(), (b"", 0))
2393
2394 def test_decode_with_trailing_carriage_removes_carriage(self):
2395 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=0)
2396 self.assertEqual(decoder.decode("bar\r"), "bar")
2397 self.assertEqual(decoder.newlines, None)
2398 self.assertEqual(decoder.getstate(), (b"", 1))
2399
2400 def test_decode_with_carriage_does_not_translate_to_newline(self):
2401 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=0)
2402 self.assertEqual(decoder.decode("bar\rbaz"), "bar\rbaz")
2403 self.assertEqual(decoder.newlines, "\r")
2404 self.assertEqual(decoder.getstate(), (b"", 0))
2405
2406 def test_decode_translate_with_carriage_return_translates_to_newline(self):
2407 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1)
2408 self.assertEqual(decoder.decode("bar\r\n"), "bar\n")
2409 self.assertEqual(decoder.newlines, "\r\n")
2410 self.assertEqual(decoder.getstate(), (b"", 0))
2411
2412 def test_decode_translate_with_carriage_translates_to_newline(self):
2413 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1)
2414 self.assertEqual(decoder.decode("bar\rbaz"), "bar\nbaz")
2415 self.assertEqual(decoder.newlines, "\r")
2416 self.assertEqual(decoder.getstate(), (b"", 0))
2417
2418 def test_getstate_with_decoder_calls_decoder_getstate(self):
2419 class C:
2420 def decode(self, input, final):
2421 return input
2422
2423 def getstate(self):
2424 return (b"foo", 5)
2425
2426 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1)
2427 self.assertEqual(decoder.getstate(), (b"foo", 10))
2428
2429 def test_getstate_with_decoder_calls_decoder_getstate_carriage(self):
2430 class C:
2431 def decode(self, input, final):
2432 return input
2433
2434 def getstate(self):
2435 return (b"foo", 5)
2436
2437 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1)
2438 decoder.decode("bar\r")
2439 self.assertEqual(decoder.getstate(), (b"foo", 11))
2440
2441 def test_setstate_with_decoder_calls_decoder_setstate(self):
2442 set_state = None
2443
2444 class C:
2445 def decode(self, input, final):
2446 return input
2447
2448 def getstate(self):
2449 return b"bar", 1
2450
2451 def setstate(self, state):
2452 nonlocal set_state
2453 set_state = state
2454
2455 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1)
2456 self.assertIsNone(decoder.setstate((b"foo", 4)))
2457 decoder.decode("bar\r")
2458 self.assertEqual(set_state, (b"foo", 2))
2459 self.assertEqual(decoder.getstate(), (b"bar", 3))
2460
2461 def test_setstate_with_decoder_calls_decoder_setstate_carriage(self):
2462 set_state = None
2463
2464 class C:
2465 def decode(self, input, final):
2466 return input
2467
2468 def getstate(self):
2469 return b"bar", 1
2470
2471 def setstate(self, state):
2472 nonlocal set_state
2473 set_state = state
2474
2475 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1)
2476 self.assertIsNone(decoder.setstate((b"foo", 4)))
2477 decoder.decode("bar\r")
2478 self.assertEqual(set_state, (b"foo", 2))
2479 self.assertEqual(decoder.getstate(), (b"bar", 3))
2480
2481 def test_reset_changes_newlines(self):
2482 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1)
2483 decoder.decode("bar\rbaz")
2484 self.assertEqual(decoder.newlines, "\r")
2485 self.assertIsNone(decoder.reset())
2486 self.assertEqual(decoder.newlines, None)
2487
2488 def test_reset_changes_pendingcr(self):
2489 decoder = _io.IncrementalNewlineDecoder(decoder=None, translate=1)
2490 decoder.decode("bar\r")
2491 self.assertEqual(decoder.getstate(), (b"", 1))
2492 self.assertIsNone(decoder.reset())
2493 self.assertEqual(decoder.getstate(), (b"", 0))
2494
2495 def test_reset_calls_decoder_reset(self):
2496 class C:
2497 def reset(self):
2498 raise UserWarning("foo")
2499
2500 decoder = _io.IncrementalNewlineDecoder(decoder=C(), translate=1)
2501 self.assertRaises(UserWarning, decoder.reset)
2502
2503
2504class UnderTextIOBaseTests(unittest.TestCase):
2505 def test_read_in_dict(self):
2506 self.assertIn("read", _io._TextIOBase.__dict__)
2507
2508 def test_write_in_dict(self):
2509 self.assertIn("write", _io._TextIOBase.__dict__)
2510
2511 def test_readline_in_dict(self):
2512 self.assertIn("readline", _io._TextIOBase.__dict__)
2513
2514 def test_detach_in_dict(self):
2515 self.assertIn("detach", _io._TextIOBase.__dict__)
2516
2517 def test_encoding_in_dict(self):
2518 self.assertIn("encoding", _io._TextIOBase.__dict__)
2519
2520 def test_newlines_in_dict(self):
2521 self.assertIn("newlines", _io._TextIOBase.__dict__)
2522
2523 def test_errors_in_dict(self):
2524 self.assertIn("errors", _io._TextIOBase.__dict__)
2525
2526
2527class BufferedReaderTests(unittest.TestCase):
2528 def test_dunder_init_with_non_readable_stream_raises_os_error(self):
2529 with _io.FileIO(_getfd(), mode="w") as file_reader:
2530 with self.assertRaises(OSError) as context:
2531 _io.BufferedReader(file_reader)
2532 self.assertEqual(str(context.exception), "File or stream is not readable.")
2533
2534 def test_dunder_init_allows_subclasses(self):
2535 class C(_io.BufferedReader):
2536 pass
2537
2538 instance = C(_io.BytesIO(b""))
2539 self.assertIsInstance(instance, _io.BufferedReader)
2540
2541 def test_peek_returns_buffered_data(self):
2542 with _io.BufferedReader(_io.BytesIO(b"hello"), buffer_size=3) as buffered_io:
2543 self.assertEqual(buffered_io.peek(-10), b"hel")
2544 self.assertEqual(buffered_io.peek(1), b"hel")
2545 self.assertEqual(buffered_io.peek(2), b"hel")
2546
2547 def test_raw_returns_raw(self):
2548 with _io.BytesIO(b"hello") as bytes_io:
2549 with _io.BufferedReader(bytes_io) as buffered_io:
2550 self.assertIs(buffered_io.raw, bytes_io)
2551
2552 def test_read_with_detached_stream_raises_value_error(self):
2553 with _io.FileIO(_getfd(), mode="r") as file_reader:
2554 buffered = _io.BufferedReader(file_reader)
2555 buffered.detach()
2556 with self.assertRaises(ValueError) as context:
2557 buffered.read()
2558 self.assertEqual(str(context.exception), "raw stream has been detached")
2559
2560 def test_read_with_closed_stream_raises_value_error(self):
2561 file_reader = _io.FileIO(_getfd(), mode="r")
2562 buffered = _io.BufferedReader(file_reader)
2563 file_reader.close()
2564 with self.assertRaises(ValueError) as context:
2565 buffered.read()
2566 self.assertIn("closed file", str(context.exception))
2567
2568 def test_read_with_negative_size_raises_value_error(self):
2569 with _io.FileIO(_getfd(), mode="r") as file_reader:
2570 buffered = _io.BufferedReader(file_reader)
2571 with self.assertRaises(ValueError) as context:
2572 buffered.read(-2)
2573 self.assertEqual(
2574 str(context.exception), "read length must be non-negative or -1"
2575 )
2576
2577 def test_read_with_none_size_calls_raw_readall(self):
2578 class C(_io.FileIO):
2579 def readall(self):
2580 raise UserWarning("foo")
2581
2582 with C(_getfd(), mode="r") as file_reader:
2583 buffered = _io.BufferedReader(file_reader)
2584 with self.assertRaises(UserWarning) as context:
2585 buffered.read(None)
2586 self.assertEqual(context.exception.args, ("foo",))
2587
2588 def test_read_reads_bytes(self):
2589 with _io.BytesIO(b"hello") as bytes_io:
2590 buffered = _io.BufferedReader(bytes_io, buffer_size=1)
2591 result = buffered.read()
2592 self.assertEqual(result, b"hello")
2593
2594 def test_read_reads_count_bytes(self):
2595 with _io.BytesIO(b"hello") as bytes_io:
2596 buffered = _io.BufferedReader(bytes_io, buffer_size=1)
2597 result = buffered.read(3)
2598 self.assertEqual(result, b"hel")
2599
2600 def test_readinto_writes_to_buffer(self):
2601 with _io.BytesIO(b"hello") as bytes_io:
2602 with _io.BufferedReader(bytes_io, buffer_size=4) as buffered:
2603 ba = bytearray(b"XXXXXXXXXXXX")
2604 buffered.readinto(ba)
2605 self.assertEqual(ba, bytearray(b"helloXXXXXXX"))
2606
2607 def test_read1_calls_read(self):
2608 with _io.BytesIO(b"hello") as bytes_io:
2609 buffered = _io.BufferedReader(bytes_io, buffer_size=10)
2610 result = buffered.read1(3)
2611 self.assertEqual(result, b"hel")
2612
2613 def test_read1_reads_from_buffer(self):
2614 with _io.BytesIO(b"hello") as bytes_io:
2615 buffered = _io.BufferedReader(bytes_io, buffer_size=4)
2616 buffered.read(1)
2617 result = buffered.read1(10)
2618 self.assertEqual(result, b"ell")
2619
2620 def test_read1_with_size_not_specified_returns_rest_in_buffer(self):
2621 with _io.BytesIO(b"hello") as bytes_io:
2622 buffered = _io.BufferedReader(bytes_io, buffer_size=4)
2623 buffered.read(1)
2624 result = buffered.read1()
2625 self.assertEqual(result, b"ell")
2626
2627 def test_read1_with_none_size_raises_type_error(self):
2628 with _io.BytesIO(b"hello") as bytes_io:
2629 buffered = _io.BufferedReader(bytes_io, buffer_size=4)
2630 with self.assertRaises(TypeError):
2631 buffered.read1(None)
2632
2633 def test_readable_calls_raw_readable(self):
2634 readable_calls = 0
2635
2636 class C:
2637 closed = False
2638
2639 def readable(self):
2640 nonlocal readable_calls
2641 readable_calls += 1
2642 return True
2643
2644 result = _io.BufferedReader(C())
2645 self.assertEqual(readable_calls, 1)
2646 self.assertTrue(result.readable())
2647 self.assertEqual(readable_calls, 2)
2648
2649 def test_tell_returns_current_position(self):
2650 with _io.BytesIO(b"hello") as bytes_io:
2651 buffered = _io.BufferedReader(bytes_io)
2652 self.assertEqual(buffered.tell(), 0)
2653 buffered.read(2)
2654 self.assertEqual(buffered.tell(), 2)
2655
2656 def test_seek_with_invalid_whence(self):
2657 with _io.BytesIO(b"hello") as bytes_io:
2658 buffered = _io.BufferedReader(bytes_io)
2659 self.assertRaises(ValueError, buffered.seek, 0, 4)
2660 self.assertRaises(ValueError, buffered.seek, 0, -1)
2661
2662 def test_seek_resets_position(self):
2663 with _io.BytesIO(b"hello") as bytes_io:
2664 buffered = _io.BufferedReader(bytes_io)
2665 buffered.read(2)
2666 self.assertEqual(buffered.tell(), 2)
2667 buffered.seek(0)
2668 self.assertEqual(buffered.tell(), 0)
2669
2670 def test_supports_arbitrary_attributes(self):
2671 with _io.BytesIO(b"hello") as bytes_io:
2672 buffered = _io.BufferedReader(bytes_io)
2673 buffered.buenos_dias = 1234
2674 self.assertEqual(buffered.buenos_dias, 1234)
2675
2676
2677class TextIOWrapperTests(unittest.TestCase):
2678 def _sample(self):
2679 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="ascii")
2680
2681 def _sample_utf8(self):
2682 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="UTF-8")
2683
2684 def _sample_utf16(self):
2685 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="UTF-16")
2686
2687 def _sample_latin1(self):
2688 return _io.TextIOWrapper(_io.BytesIO(b"hello++"), encoding="Latin-1")
2689
2690 def _sample_buffered_writer(self):
2691 return _io.BufferedWriter(_io.BytesIO(b"hello++"), buffer_size=3)
2692
2693 def _sample_buffered_reader(self):
2694 return _io.BufferedReader(_io.BytesIO(b"hello++"), buffer_size=3)
2695
2696 def test_dunder_init_with_none_buffer_raises_attribute_error(self):
2697 with self.assertRaises(AttributeError) as context:
2698 _io.TextIOWrapper(None)
2699 self.assertEqual(
2700 str(context.exception), "'NoneType' object has no attribute 'readable'"
2701 )
2702
2703 def test_dunder_init_with_non_str_encoding_raises_type_error(self):
2704 with self.assertRaisesRegex(
2705 TypeError, r"TextIOWrapper\(\) argument .* must be str or None, not int"
2706 ):
2707 _io.TextIOWrapper("hello", encoding=5)
2708
2709 def test_dunder_init_with_non_str_errors_raises_type_error(self):
2710 with self.assertRaisesRegex(
2711 TypeError, r"TextIOWrapper\(\) argument .* must be str or None, not int"
2712 ):
2713 _io.TextIOWrapper("hello", errors=5)
2714
2715 def test_dunder_init_with_non_str_newline_raises_type_error(self):
2716 with self.assertRaisesRegex(
2717 TypeError, r"TextIOWrapper\(\) argument .* must be str or None, not int"
2718 ):
2719 _io.TextIOWrapper("hello", newline=5)
2720
2721 def test_dunder_init_with_non_int_line_buffering_raises_type_error(self):
2722 with self.assertRaises(TypeError) as context:
2723 _io.TextIOWrapper("hello", line_buffering="buf")
2724 self.assertEqual(
2725 str(context.exception), "an integer is required (got type str)"
2726 )
2727
2728 def test_dunder_init_with_non_int_write_through_raises_type_error(self):
2729 with self.assertRaises(TypeError) as context:
2730 _io.TextIOWrapper("hello", write_through="True")
2731 self.assertEqual(
2732 str(context.exception), "an integer is required (got type str)"
2733 )
2734
2735 def test_dunder_init_with_bad_str_newline_raises_value_error(self):
2736 with self.assertRaises(ValueError) as context:
2737 _io.TextIOWrapper("hello", newline="foo")
2738 self.assertEqual(str(context.exception), "illegal newline value: foo")
2739
2740 def test_dunder_init_returns_text_io_wrapper(self):
2741 with self._sample() as text_io:
2742 self.assertIsInstance(text_io, _io.TextIOWrapper)
2743
2744 def test_dunder_repr(self):
2745 with self._sample() as text_io:
2746 self.assertEqual(text_io.__repr__(), "<_io.TextIOWrapper encoding='ascii'>")
2747
2748 def test_dunder_repr_with_name(self):
2749 class C(_io.TextIOWrapper):
2750 name = "foo"
2751
2752 with C(_io.BytesIO(b"hello"), encoding="ascii") as text_io:
2753 self.assertEqual(
2754 text_io.__repr__(), "<_io.TextIOWrapper name='foo' encoding='ascii'>"
2755 )
2756
2757 def test_dunder_repr_with_mode(self):
2758 # TODO(T54575279): remove workaround and use subclass again
2759 with _io.TextIOWrapper(_io.BytesIO(b"hello"), encoding="ascii") as text_io:
2760 text_io.mode = "rwx"
2761 self.assertEqual(
2762 repr(text_io), "<_io.TextIOWrapper mode='rwx' encoding='ascii'>"
2763 )
2764
2765 def test_buffer_returns_buffer(self):
2766 with _io.BytesIO() as bytes_io:
2767 with _io.TextIOWrapper(bytes_io) as text_io:
2768 self.assertIs(text_io.buffer, bytes_io)
2769
2770 def test_close_with_non_TextIOWrapper_raises_type_error(self):
2771 with self.assertRaises(TypeError):
2772 _io.TextIOWrapper.close(5)
2773
2774 def test_close_with_detached_buffer_raises_value_error(self):
2775 text_io = self._sample()
2776 text_io.detach()
2777 self.assertRaisesRegex(
2778 ValueError, "underlying buffer has been detached", text_io.close
2779 )
2780
2781 def test_closed_with_detached_buffer_raises_value_error(self):
2782 text_io = self._sample()
2783 text_io.detach()
2784 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"):
2785 text_io.closed
2786
2787 def test_detach_with_non_TextIOWrapper_raises_type_error(self):
2788 with self.assertRaises(TypeError):
2789 _io.TextIOWrapper.detach(5)
2790
2791 def test_detach_with_detached_buffer_raises_value_error(self):
2792 text_io = self._sample()
2793 text_io.detach()
2794 self.assertRaisesRegex(
2795 ValueError, "underlying buffer has been detached", text_io.detach
2796 )
2797
2798 def test_encoding_returns_encoding(self):
2799 with self._sample() as text_io:
2800 self.assertEqual(text_io.encoding, "ascii")
2801
2802 def test_default_encoding_is_UTF8(self):
2803 with _io.BytesIO() as bytes_io:
2804 with _io.TextIOWrapper(bytes_io) as text_io:
2805 self.assertEqual(text_io.encoding, "UTF-8")
2806
2807 def test_errors_returns_default_errors(self):
2808 with self._sample() as text_io:
2809 self.assertEqual(text_io.errors, "strict")
2810
2811 def test_errors_returns_errors(self):
2812 with _io.TextIOWrapper(_io.BytesIO(b"hello"), errors="foobar") as text_io:
2813 self.assertEqual(text_io.errors, "foobar")
2814
2815 def test_line_buffering_returns_line_buffering(self):
2816 with _io.TextIOWrapper(_io.BytesIO(b"hello"), line_buffering=5) as text_io:
2817 self.assertEqual(text_io.line_buffering, True)
2818
2819 def test_name_with_bytes_io_raises_attribute_error(self):
2820 with self._sample() as text_io:
2821 with self.assertRaisesRegex(AttributeError, "name"):
2822 text_io.name
2823
2824 def test_name_with_detached_buffer_raises_value_error(self):
2825 text_io = self._sample()
2826 text_io.detach()
2827 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"):
2828 text_io.name
2829
2830 def test_name_returns_buffer_name(self):
2831 class C(_io.BytesIO):
2832 name = "foobar"
2833
2834 with _io.TextIOWrapper(C(b"hello")) as text_io:
2835 self.assertEqual(text_io.name, "foobar")
2836
2837 def test_fileno_with_non_TextIOWrapper_raises_type_error(self):
2838 with self.assertRaises(TypeError):
2839 _io.TextIOWrapper.fileno(5)
2840
2841 def test_fileno_with_bytes_io_raises_unsupported_operation(self):
2842 with self._sample() as text_io:
2843 with self.assertRaisesRegex(_io.UnsupportedOperation, "fileno"):
2844 text_io.fileno()
2845
2846 def test_fileno_with_detached_buffer_raises_value_error(self):
2847 text_io = self._sample()
2848 text_io.detach()
2849 self.assertRaisesRegex(
2850 ValueError, "underlying buffer has been detached", text_io.fileno
2851 )
2852
2853 def test_fileno_returns_buffer_fileno(self):
2854 class C(_io.BytesIO):
2855 def fileno(self):
2856 return 5
2857
2858 with _io.TextIOWrapper(C(b"hello")) as text_io:
2859 self.assertEqual(text_io.fileno(), 5)
2860
2861 def test_isatty_with_non_TextIOWrapper_raises_type_error(self):
2862 with self.assertRaises(TypeError):
2863 _io.TextIOWrapper.isatty(5)
2864
2865 def test_isatty_with_bytes_io_returns_false(self):
2866 with self._sample() as text_io:
2867 self.assertFalse(text_io.isatty())
2868
2869 def test_isatty_with_detached_buffer_raises_value_error(self):
2870 text_io = self._sample()
2871 text_io.detach()
2872 self.assertRaisesRegex(
2873 ValueError, "underlying buffer has been detached", text_io.isatty
2874 )
2875
2876 def test_isatty_returns_buffer_isatty(self):
2877 class C(_io.BytesIO):
2878 def isatty(self):
2879 return True
2880
2881 with _io.TextIOWrapper(C(b"hello")) as text_io:
2882 self.assertTrue(text_io.isatty())
2883
2884 def test_newlines_with_detached_buffer_raises_value_error(self):
2885 text_io = self._sample()
2886 text_io.detach()
2887 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"):
2888 text_io.newlines
2889
2890 def test_newlines_returns_newlines(self):
2891 with _io.TextIOWrapper(_io.BytesIO(b"\rhello\n")) as text_io:
2892 text_io.read()
2893 self.assertEqual(text_io.newlines, ("\r", "\n"))
2894
2895 def test_seek_with_non_TextIOWrapper_raises_type_error(self):
2896 with self.assertRaises(TypeError):
2897 _io.TextIOWrapper.seek(5, 5)
2898
2899 def test_seek_with_detached_buffer_raises_value_error(self):
2900 text_io = self._sample()
2901 text_io.detach()
2902 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"):
2903 text_io.seek(0)
2904
2905 def test_seek_with_detached_buffer_and_non_int_whence_raises_type_error(self):
2906 text_io = self._sample()
2907 text_io.detach()
2908 with self.assertRaises(TypeError):
2909 text_io.seek(5, 5.5)
2910
2911 def test_seekable_with_non_TextIOWrapper_raises_type_error(self):
2912 with self.assertRaises(TypeError):
2913 _io.TextIOWrapper.seekable(5)
2914
2915 def test_seekable_with_detached_buffer_raises_value_error(self):
2916 text_io = self._sample()
2917 text_io.detach()
2918 self.assertRaisesRegex(
2919 ValueError, "underlying buffer has been detached", text_io.seekable
2920 )
2921
2922 def test_seekable_with_closed_io_raises_value_error(self):
2923 text_io = self._sample()
2924 text_io.close()
2925 self.assertRaises(ValueError, text_io.seekable)
2926
2927 def test_readable_with_non_TextIOWrapper_raises_type_error(self):
2928 with self.assertRaises(TypeError):
2929 _io.TextIOWrapper.readable(5)
2930
2931 def test_readable_with_detached_buffer_raises_value_error(self):
2932 text_io = self._sample()
2933 text_io.detach()
2934 self.assertRaisesRegex(
2935 ValueError, "underlying buffer has been detached", text_io.readable
2936 )
2937
2938 def test_readable_calls_buffer_readable(self):
2939 class C(_io.BytesIO):
2940 readable = Mock(name="readable")
2941
2942 with C(b"hello") as bytes_io:
2943 with _io.TextIOWrapper(bytes_io) as text_io:
2944 bytes_io.readable.assert_called_once()
2945 text_io.readable()
2946 self.assertEqual(bytes_io.readable.call_count, 2)
2947
2948 def test_supports_arbitrary_attributes(self):
2949 text_io = self._sample()
2950 text_io.heya = ()
2951 self.assertEqual(text_io.heya, ())
2952
2953 def test_writable_with_non_TextIOWrapper_raises_type_error(self):
2954 with self.assertRaises(TypeError):
2955 _io.TextIOWrapper.writable(5)
2956
2957 def test_writable_with_detached_buffer_raises_value_error(self):
2958 text_io = self._sample()
2959 text_io.detach()
2960 self.assertRaisesRegex(
2961 ValueError, "underlying buffer has been detached", text_io.writable
2962 )
2963
2964 def test_writable_calls_buffer_writable(self):
2965 class C(_io.BytesIO):
2966 writable = Mock(name="writable")
2967
2968 with C(b"hello") as bytes_io:
2969 with _io.TextIOWrapper(bytes_io) as text_io:
2970 bytes_io.writable.assert_called_once()
2971 text_io.writable()
2972 self.assertEqual(bytes_io.writable.call_count, 2)
2973
2974 def test_flush_with_non_TextIOWrapper_raises_type_error(self):
2975 with self.assertRaises(TypeError):
2976 _io.TextIOWrapper.flush(5)
2977
2978 def test_flush_with_closed_buffer_raises_value_error(self):
2979 text_io = self._sample()
2980 text_io.close()
2981 self.assertRaisesRegex(ValueError, "closed", text_io.flush)
2982
2983 def test_flush_with_detached_buffer_raises_value_error(self):
2984 text_io = self._sample()
2985 text_io.detach()
2986 self.assertRaisesRegex(
2987 ValueError, "underlying buffer has been detached", text_io.flush
2988 )
2989
2990 def test_flush_calls_buffer_flush(self):
2991 class C(_io.BytesIO):
2992 flush = Mock(name="flush", return_value=None)
2993
2994 with C(b"hello") as bytes_io:
2995 with _io.TextIOWrapper(bytes_io) as text_io:
2996 bytes_io.flush.assert_not_called()
2997 self.assertIsNone(text_io.flush())
2998 bytes_io.flush.assert_called_once()
2999
3000 def test_read_with_non_TextIOWrapper_raises_type_error(self):
3001 with self.assertRaises(TypeError):
3002 _io.TextIOWrapper.read(5)
3003
3004 def test_read_with_detached_buffer_raises_value_error(self):
3005 text_io = self._sample()
3006 text_io.detach()
3007 self.assertRaisesRegex(
3008 ValueError, "underlying buffer has been detached", text_io.read
3009 )
3010
3011 def test_read_with_detached_buffer_and_non_int_size_raises_type_error(self):
3012 text_io = self._sample()
3013 text_io.detach()
3014 with self.assertRaises(TypeError):
3015 text_io.read(5.5)
3016
3017 def test_read_reads_chars(self):
3018 with _io.TextIOWrapper(_io.BytesIO(b"foo bar")) as text_io:
3019 result = text_io.read(3)
3020 self.assertEqual(result, "foo")
3021 self.assertEqual(text_io.read(), " bar")
3022
3023 def test_readline_with_non_TextIOWrapper_raises_type_error(self):
3024 with self.assertRaises(TypeError):
3025 _io.TextIOWrapper.readline(5)
3026
3027 def test_readline_with_closed_file_raises_value_error(self):
3028 text_io = self._sample()
3029 text_io.close()
3030 self.assertRaisesRegex(ValueError, "closed file", text_io.readline)
3031
3032 def test_readline_with_detached_buffer_raises_value_error(self):
3033 text_io = self._sample()
3034 text_io.detach()
3035 self.assertRaisesRegex(
3036 ValueError, "underlying buffer has been detached", text_io.readline
3037 )
3038
3039 def test_readline_with_non_int_size_raises_type_error(self):
3040 with self._sample() as text_io:
3041 self.assertRaisesRegex(
3042 TypeError,
3043 "cannot be interpreted as an integer",
3044 text_io.readline,
3045 "not_int",
3046 )
3047
3048 def test_readline_reads_line_terminated_by_newline(self):
3049 with _io.TextIOWrapper(_io.BytesIO(b"foo\nbar")) as text_io:
3050 self.assertEqual(text_io.readline(), "foo\n")
3051 self.assertEqual(text_io.readline(), "bar")
3052 self.assertEqual(text_io.readline(), "")
3053
3054 def test_readline_reads_line_terminated_by_cr(self):
3055 with _io.TextIOWrapper(_io.BytesIO(b"foo\rbar")) as text_io:
3056 self.assertEqual(text_io.readline(), "foo\n")
3057 self.assertEqual(text_io.readline(), "bar")
3058 self.assertEqual(text_io.readline(), "")
3059
3060 def test_readline_reads_line_terminated_by_crlf(self):
3061 with _io.TextIOWrapper(_io.BytesIO(b"foo\r\nbar")) as text_io:
3062 self.assertEqual(text_io.readline(), "foo\n")
3063 self.assertEqual(text_io.readline(), "bar")
3064 self.assertEqual(text_io.readline(), "")
3065
3066 def test_readline_reads_line_with_only_cr(self):
3067 with _io.TextIOWrapper(_io.BytesIO(b"\r")) as text_io:
3068 self.assertEqual(text_io.readline(), "\n")
3069 self.assertEqual(text_io.readline(), "")
3070
3071 def test_tell_with_non_TextIOWrapper_raises_type_error(self):
3072 with self.assertRaises(TypeError):
3073 _io.TextIOWrapper.tell(5)
3074
3075 def test_tell_with_detached_buffer_raises_value_error(self):
3076 text_io = self._sample()
3077 text_io.detach()
3078 self.assertRaisesRegex(
3079 ValueError, "underlying buffer has been detached", text_io.tell
3080 )
3081
3082 def test_truncate_with_non_TextIOWrapper_raises_type_error(self):
3083 with self.assertRaises(TypeError):
3084 _io.TextIOWrapper.truncate(5, 5)
3085
3086 def test_truncate_with_closed_buffer_raises_value_error(self):
3087 text_io = self._sample()
3088 text_io.close()
3089 self.assertRaisesRegex(ValueError, "closed", text_io.truncate)
3090
3091 def test_truncate_with_detached_buffer_raises_value_error(self):
3092 text_io = self._sample()
3093 text_io.detach()
3094 self.assertRaisesRegex(
3095 ValueError, "underlying buffer has been detached", text_io.truncate
3096 )
3097
3098 def test_truncate_calls_buffer_flush(self):
3099 class C(_io.BytesIO):
3100 flush = Mock(name="flush")
3101
3102 with C(b"hello") as bytes_io:
3103 with _io.TextIOWrapper(C()) as text_io:
3104 bytes_io.flush.assert_not_called()
3105 text_io.truncate()
3106 bytes_io.flush.assert_called_once()
3107
3108 def test_truncate_calls_buffer_truncate(self):
3109 class C(_io.BytesIO):
3110 truncate = Mock(name="truncate")
3111
3112 with C(b"hello") as bytes_io:
3113 with _io.TextIOWrapper(C()) as text_io:
3114 bytes_io.truncate.assert_not_called()
3115 text_io.truncate()
3116 bytes_io.truncate.assert_called_once()
3117
3118 def test_write_with_non_TextIOWrapper_raises_type_error(self):
3119 with self.assertRaises(TypeError):
3120 _io.TextIOWrapper.write(5, "foo")
3121
3122 def test_write_with_detached_buffer_raises_value_error(self):
3123 text_io = self._sample()
3124 text_io.detach()
3125 with self.assertRaisesRegex(ValueError, "underlying buffer has been detached"):
3126 text_io.write("foo")
3127
3128 def test_write_with_detach_buffer_and_non_str_text_raises_type_error(self):
3129 text_io = self._sample()
3130 text_io.detach()
3131 with self.assertRaises(TypeError):
3132 text_io.write(5.5)
3133
3134 def test_write_with_ascii_encoding_and_letters_writes_chars(self):
3135 with self._sample() as text_io:
3136 self.assertEqual(text_io.write("foo"), 3)
3137 text_io.seek(0)
3138 self.assertEqual(text_io.read(), "foolo++")
3139
3140 def test_write_with_ascii_encoding_and_special_characters_writes_chars(self):
3141 with self._sample() as text_io:
3142 self.assertEqual(text_io.write("!?+-()"), 6)
3143 text_io.seek(0)
3144 self.assertEqual(text_io.read(), "!?+-()+")
3145
3146 def test_write_with_utf8_encoding_and_letters_writes_chars(self):
3147 with self._sample_utf8() as text_io:
3148 self.assertEqual(text_io.write("foo"), 3)
3149 text_io.seek(0)
3150 self.assertEqual(text_io.read(), "foolo++")
3151
3152 def test_write_with_utf8_encoding_and_special_characters_writes_chars(self):
3153 with self._sample_utf8() as text_io:
3154 self.assertEqual(text_io.write("(("), 2)
3155 text_io.seek(0)
3156 self.assertEqual(text_io.read(), "((llo++")
3157
3158 def test_write_with_utf8_encoding_and_strict_errors_and_surrogate_raises_unicodeencode_error(
3159 self,
3160 ):
3161 text_io = _io.TextIOWrapper(
3162 _io.BytesIO(b"hello"), encoding="UTF-8", errors="strict"
3163 )
3164 with self.assertRaises(ValueError):
3165 text_io.write("ab\uD800++")
3166
3167 def test_write_with_utf8_encoding_and_ignore_errors_removes_surrogates(self):
3168 text_io = _io.TextIOWrapper(
3169 _io.BytesIO(b"hello++"), encoding="UTF-8", errors="ignore"
3170 )
3171 self.assertEqual(text_io.write("ab\uD800a"), 4)
3172 text_io.seek(0)
3173 self.assertEqual(text_io.read(), "abalo++")
3174
3175 def test_write_with_utf8_encoding_and_replace_errors_replaces_surrogates_with_question_mark(
3176 self,
3177 ):
3178 text_io = _io.TextIOWrapper(
3179 _io.BytesIO(b"hello++"), encoding="UTF-8", errors="replace"
3180 )
3181 self.assertEqual(text_io.write("ab\uD800"), 3)
3182 text_io.seek(0)
3183 self.assertEqual(text_io.read(), "ab?lo++")
3184
3185 # TODO(T61927696): Test write with "surrogatepass" errors when "surrogatepass" is supported by decode
3186
3187 def test_write_with_latin1_encoding_and_letters_writes_chars(self):
3188 with self._sample_latin1() as text_io:
3189 self.assertEqual(text_io.write("foo"), 3)
3190 text_io.seek(0)
3191 self.assertEqual(text_io.read(), "foolo++")
3192
3193 def test_write_with_latin1_encoding_and_special_characters_writes_chars(self):
3194 with self._sample_latin1() as text_io:
3195 self.assertEqual(text_io.write("(("), 2)
3196 text_io.seek(0)
3197 self.assertEqual(text_io.read(), "((llo++")
3198
3199 def test_write_with_bufferedwriter_ascii_encoding_and_letters_returns_length_written(
3200 self,
3201 ):
3202 with _io.TextIOWrapper(
3203 self._sample_buffered_writer(), encoding="ascii"
3204 ) as text_io:
3205 self.assertEqual(text_io.write("foo"), 3)
3206
3207 def test_write_with_bufferedwriter_ascii_encoding_and_special_characters_returns_length_written(
3208 self,
3209 ):
3210 with _io.TextIOWrapper(
3211 self._sample_buffered_writer(), encoding="ascii"
3212 ) as text_io:
3213 self.assertEqual(text_io.write("!?+-()"), 6)
3214
3215 def test_write_with_bufferedwriter_utf8_encoding_and_letters_returns_length_written(
3216 self,
3217 ):
3218 with _io.TextIOWrapper(
3219 self._sample_buffered_writer(), encoding="UTF-8"
3220 ) as text_io:
3221 self.assertEqual(text_io.write("foo"), 3)
3222
3223 def test_write_with_bufferedwriter_utf8_encoding_and_special_characters_returns_length_written(
3224 self,
3225 ):
3226 with _io.TextIOWrapper(
3227 self._sample_buffered_writer(), encoding="UTF-8"
3228 ) as text_io:
3229 self.assertEqual(text_io.write("(("), 2)
3230
3231 def test_write_with_bufferedwriter_utf8_encoding_and_strict_errors_and_surrogate_raises_unicodeencode_error(
3232 self,
3233 ):
3234 with _io.TextIOWrapper(
3235 self._sample_buffered_writer(), encoding="UTF-8", errors="strict"
3236 ) as text_io:
3237 with self.assertRaises(ValueError):
3238 text_io.write("ab\uD800++")
3239
3240 def test_write_with_bufferedwriter_utf8_encoding_and_ignore_errors_returns_length_written(
3241 self,
3242 ):
3243 with _io.TextIOWrapper(
3244 self._sample_buffered_writer(), encoding="UTF-8", errors="ignore"
3245 ) as text_io:
3246 self.assertEqual(text_io.write("ab\uD800a"), 4)
3247
3248 def test_write_with_bufferedwriter_with_utf8_encoding_and_replace_errors_returns_length_written(
3249 self,
3250 ):
3251 with _io.TextIOWrapper(
3252 self._sample_buffered_writer(), encoding="UTF-8", errors="replace"
3253 ) as text_io:
3254 self.assertEqual(text_io.write("ab\uD800"), 3)
3255
3256 # TODO(T61927696): Test write with "surrogatepass" errors when "surrogatepass" is supported by decode
3257
3258 def test_write_with_bufferedwriter_latin1_encoding_and_letters_returns_length_written(
3259 self,
3260 ):
3261 with _io.TextIOWrapper(
3262 self._sample_buffered_writer(), encoding="Latin-1"
3263 ) as text_io:
3264 self.assertEqual(text_io.write("foo"), 3)
3265
3266 def test_write_with_bufferedwriter_latin1_encoding_and_special_characters_returns_length_written(
3267 self,
3268 ):
3269 with _io.TextIOWrapper(
3270 self._sample_buffered_writer(), encoding="Latin-1"
3271 ) as text_io:
3272 self.assertEqual(text_io.write("(("), 2)
3273
3274
3275class StringIOTests(unittest.TestCase):
3276 def test_dunder_init_with_non_str_initial_value_raises_type_error(self):
3277 self.assertRaisesRegex(
3278 TypeError,
3279 "initial_value must be str or None",
3280 _io.StringIO,
3281 initial_value=b"foo",
3282 )
3283
3284 def test_dunder_init_sets_initial_value(self):
3285 with _io.StringIO(initial_value="foo") as string_io:
3286 self.assertEqual(string_io.getvalue(), "foo")
3287
3288 def test_dunder_init_with_non_str_or_none_newline_raises_value_error(self):
3289 with self.assertRaises(TypeError) as context:
3290 _io.StringIO(newline=1)
3291 self.assertRegex(str(context.exception), "newline must be str or None, not int")
3292
3293 def test_dunder_init_with_illegal_newline_raises_value_error(self):
3294 with self.assertRaises(ValueError) as context:
3295 _io.StringIO(newline="n")
3296 self.assertRegex(str(context.exception), "illegal newline value")
3297
3298 def test_dunder_repr(self):
3299 with _io.StringIO(initial_value="foo") as string_io:
3300 self.assertRegex(string_io.__repr__(), r"<_io.StringIO object at 0x\w+>")
3301
3302 def test_close_with_non_StringIO_self_raises_type_error(self):
3303 with self.assertRaises(TypeError):
3304 _io.StringIO.close(0.5)
3305
3306 def test_close_with_StringIO_subclass_sets_closed_true(self):
3307 class C(_io.StringIO):
3308 pass
3309
3310 sio = C("foo")
3311
3312 self.assertFalse(sio.closed)
3313 sio.close()
3314 self.assertTrue(sio.closed)
3315
3316 def test_detach_raises_unsupported_operation(self):
3317 with _io.StringIO() as string_io:
3318 self.assertRaisesRegex(_io.UnsupportedOperation, "detach", string_io.detach)
3319
3320 def test_encoding_always_returns_none(self):
3321 with _io.StringIO() as string_io:
3322 self.assertIsNone(string_io.encoding)
3323
3324 def test_errors_always_returns_none(self):
3325 with _io.StringIO() as string_io:
3326 self.assertIsNone(string_io.errors)
3327
3328 def test_line_buffering_with_open_returns_false(self):
3329 with _io.StringIO() as string_io:
3330 self.assertFalse(string_io.line_buffering)
3331
3332 def test_line_buffering_with_closed_returns_raises_value_error(self):
3333 string_io = _io.StringIO()
3334 string_io.close()
3335 with self.assertRaises(ValueError) as context:
3336 string_io.line_buffering
3337 self.assertRegex(str(context.exception), "I/O operation on closed file")
3338
3339 def test_getvalue_with_non_stringio_raises_type_error(self):
3340 self.assertRaisesRegex(
3341 TypeError,
3342 r"'getvalue' .* '(_io\.)?StringIO' object.* a 'int'",
3343 _io.StringIO.getvalue,
3344 1,
3345 )
3346
3347 def test_getvalue_with_open_returns_copy_of_value(self):
3348 strio = _io.StringIO("foobarbaz")
3349 first_value = strio.getvalue()
3350 strio.write("temp")
3351 second_value = strio.getvalue()
3352 self.assertEqual(first_value, "foobarbaz")
3353 self.assertEqual(second_value, "temparbaz")
3354
3355 def test_getvalue_with_closed_raises_value_error(self):
3356 string_io = _io.StringIO()
3357 string_io.close()
3358 with self.assertRaises(ValueError) as context:
3359 string_io.getvalue()
3360 self.assertRegex(str(context.exception), "I/O operation on closed file")
3361
3362 def test_getvalue_subclass_and_closed_attr_returns_value(self):
3363 class Closed(_io.StringIO):
3364 closed = True
3365
3366 closed = Closed("foobar")
3367 self.assertEqual(closed.getvalue(), "foobar")
3368
3369 def test_iter_with_open_returns_self(self):
3370 string_io = _io.StringIO()
3371 self.assertEqual(string_io, string_io.__iter__())
3372
3373 def test_iter_with_closed_raises_value_error(self):
3374 string_io = _io.StringIO()
3375 string_io.close()
3376 with self.assertRaises(ValueError) as context:
3377 string_io.__iter__()
3378 self.assertRegex(str(context.exception), "I/O operation on closed file")
3379
3380 def test_iter_subclass_and_closed_attr_raises_value_error(self):
3381 class Closed(_io.StringIO):
3382 closed = True
3383
3384 closed = Closed()
3385 with self.assertRaises(ValueError) as context:
3386 closed.__iter__()
3387 self.assertRegex(str(context.exception), "I/O operation on closed file")
3388
3389 def test_next_with_non_stringio_raises_type_error(self):
3390 self.assertRaisesRegex(
3391 TypeError,
3392 r"'__next__' .* '(_io\.)?StringIO' object.* a 'int'",
3393 _io.StringIO.__next__,
3394 1,
3395 )
3396
3397 def test_next_with_open_reads_line(self):
3398 string_io = _io.StringIO("foo\nbar")
3399 self.assertEqual(string_io.__next__(), "foo\n")
3400 self.assertEqual(string_io.__next__(), "bar")
3401 with self.assertRaises(StopIteration):
3402 string_io.__next__()
3403
3404 def test_next_with_closed_raises_value_error(self):
3405 string_io = _io.StringIO()
3406 string_io.close()
3407 with self.assertRaises(ValueError) as context:
3408 string_io.__next__()
3409 self.assertRegex(str(context.exception), "I/O operation on closed file")
3410
3411 def test_next_subclass_and_closed_attr_reads_line(self):
3412 class Closed(_io.StringIO):
3413 closed = True
3414
3415 closed = Closed("foo\nbar")
3416 self.assertEqual(closed.__next__(), "foo\n")
3417 self.assertEqual(closed.__next__(), "bar")
3418 with self.assertRaises(StopIteration):
3419 closed.__next__()
3420
3421 def test_newline_default(self):
3422 strio = _io.StringIO("a\nb\r\nc\rd")
3423 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"])
3424 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd")
3425
3426 strio = _io.StringIO()
3427 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8)
3428 strio.seek(0)
3429 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"])
3430 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd")
3431
3432 def test_newline_none(self):
3433 strio = _io.StringIO("a\nb\r\nc\rd", newline=None)
3434 self.assertEqual(list(strio), ["a\n", "b\n", "c\n", "d"])
3435 strio.seek(0)
3436 self.assertEqual(strio.read(1), "a")
3437 self.assertEqual(strio.read(2), "\nb")
3438 self.assertEqual(strio.read(2), "\nc")
3439 self.assertEqual(strio.read(1), "\n")
3440 self.assertEqual(strio.getvalue(), "a\nb\nc\nd")
3441
3442 strio = _io.StringIO(newline=None)
3443 self.assertEqual(2, strio.write("a\n"))
3444 self.assertEqual(3, strio.write("b\r\n"))
3445 self.assertEqual(3, strio.write("c\rd"))
3446 strio.seek(0)
3447 self.assertEqual(strio.read(), "a\nb\nc\nd")
3448 self.assertEqual(strio.getvalue(), "a\nb\nc\nd")
3449
3450 strio = _io.StringIO("a\r\nb", newline=None)
3451 self.assertEqual(strio.read(3), "a\nb")
3452
3453 def test_newline_none_with_r_end_not_raise_exception(self):
3454 strio = _io.StringIO("a\nb\r\nc\r", newline=None)
3455 self.assertEqual(list(strio), ["a\n", "b\n", "c\n"])
3456
3457 def test_newline_empty(self):
3458 strio = _io.StringIO("a\nb\r\nc\rd", newline="")
3459 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\r", "d"])
3460 strio.seek(0)
3461 self.assertEqual(strio.read(4), "a\nb\r")
3462 self.assertEqual(strio.read(2), "\nc")
3463 self.assertEqual(strio.read(1), "\r")
3464 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd")
3465
3466 strio = _io.StringIO(newline="")
3467 self.assertEqual(2, strio.write("a\n"))
3468 self.assertEqual(2, strio.write("b\r"))
3469 self.assertEqual(2, strio.write("\nc"))
3470 self.assertEqual(2, strio.write("\rd"))
3471 strio.seek(0)
3472 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\r", "d"])
3473 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd")
3474
3475 def test_newline_lf(self):
3476 strio = _io.StringIO("a\nb\r\nc\rd", newline="\n")
3477 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"])
3478 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd")
3479
3480 strio = _io.StringIO(newline="\n")
3481 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8)
3482 strio.seek(0)
3483 self.assertEqual(list(strio), ["a\n", "b\r\n", "c\rd"])
3484 self.assertEqual(strio.getvalue(), "a\nb\r\nc\rd")
3485
3486 def test_newline_cr(self):
3487 strio = _io.StringIO("a\nb\r\nc\rd", newline="\r")
3488 self.assertEqual(strio.read(), "a\rb\r\rc\rd")
3489 strio.seek(0)
3490 self.assertEqual(list(strio), ["a\r", "b\r", "\r", "c\r", "d"])
3491 self.assertEqual(strio.getvalue(), "a\rb\r\rc\rd")
3492
3493 strio = _io.StringIO(newline="\r")
3494 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8)
3495 strio.seek(0)
3496 self.assertEqual(list(strio), ["a\r", "b\r", "\r", "c\r", "d"])
3497 strio.seek(0)
3498 self.assertEqual(strio.readlines(), ["a\r", "b\r", "\r", "c\r", "d"])
3499 self.assertEqual(strio.getvalue(), "a\rb\r\rc\rd")
3500
3501 def test_newline_crlf(self):
3502 strio = _io.StringIO("a\nb\r\nc\rd", newline="\r\n")
3503 self.assertEqual(strio.read(), "a\r\nb\r\r\nc\rd")
3504 strio.seek(0)
3505 self.assertEqual(list(strio), ["a\r\n", "b\r\r\n", "c\rd"])
3506 strio.seek(0)
3507 self.assertEqual(strio.readlines(), ["a\r\n", "b\r\r\n", "c\rd"])
3508 self.assertEqual(strio.getvalue(), "a\r\nb\r\r\nc\rd")
3509
3510 strio = _io.StringIO(newline="\r\n")
3511 self.assertEqual(strio.write("a\nb\r\nc\rd"), 8)
3512 strio.seek(0)
3513 self.assertEqual(list(strio), ["a\r\n", "b\r\r\n", "c\rd"])
3514 self.assertEqual(strio.getvalue(), "a\r\nb\r\r\nc\rd")
3515
3516 def test_read_with_open_returns_string(self):
3517 string_io = _io.StringIO("foo\n")
3518 self.assertEqual(string_io.read(), "foo\n")
3519
3520 def test_read_with_closed_raises_value_error(self):
3521 string_io = _io.StringIO()
3522 string_io.close()
3523 with self.assertRaises(ValueError) as context:
3524 string_io.read()
3525 self.assertRegex(str(context.exception), "I/O operation on closed file")
3526
3527 def test_read_with_subclass_and_closed_attr_returns_string(self):
3528 class Closed(_io.StringIO):
3529 closed = True
3530
3531 closed = Closed("foo\n")
3532 self.assertEqual(closed.read(), "foo\n")
3533
3534 def test_read_with_non_int_raises_type_error(self):
3535 string_io = _io.StringIO("hello world")
3536 with self.assertRaises(TypeError):
3537 string_io.read("not-int")
3538
3539 # CPython 3.6 does not do the __index__ call, whereas >= 3.7 does. Since we've
3540 # already implemented this functionality, we should test it.
3541 @pyro_only
3542 def test_read_with_intlike_calls_dunder_index(self):
3543 class IntLike:
3544 def __index__(self):
3545 return 5
3546
3547 string_io = _io.StringIO("hello world")
3548 self.assertEqual(string_io.read(IntLike()), "hello")
3549
3550 def test_read_starts_at_internal_pos(self):
3551 string_io = _io.StringIO("hello world")
3552 string_io.seek(6)
3553 self.assertEqual(string_io.read(), "world")
3554
3555 def test_read_stops_after_passed_in_number_of_bytes(self):
3556 string_io = _io.StringIO("hello world")
3557 self.assertEqual(string_io.read(5), "hello")
3558
3559 def test_read_with_negative_size_returns_remaining_string(self):
3560 string_io = _io.StringIO("hello world")
3561 self.assertEqual(string_io.read(-5), "hello world")
3562
3563 def test_read_with_overseek_returns_empty_string(self):
3564 string_io = _io.StringIO("hello world")
3565 string_io.seek(20)
3566 self.assertEqual(string_io.read(), "")
3567
3568 def test_read_translates_all_newlines_if_newline_is_none(self):
3569 string_io = _io.StringIO("foo\nbar\rbaz\r\n", None)
3570 self.assertEqual(string_io.read(), "foo\nbar\nbaz\n")
3571
3572 def test_read_doesnt_translate_if_newline_is_empty(self):
3573 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "")
3574 self.assertEqual(string_io.read(), "foo\nbar\rbaz\r\n")
3575
3576 def test_read_translates_line_feeds_if_newline_is_string(self):
3577 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "\r\n")
3578 self.assertEqual(string_io.read(), "foo\r\nbar\rbaz\r\r\n")
3579
3580 def test_readline_with_open_returns_string(self):
3581 string_io = _io.StringIO("foo\nbar")
3582 self.assertEqual(string_io.readline(), "foo\n")
3583
3584 def test_readline_with_closed_raises_value_error(self):
3585 string_io = _io.StringIO()
3586 string_io.close()
3587 with self.assertRaises(ValueError) as context:
3588 string_io.readline()
3589 self.assertRegex(str(context.exception), "I/O operation on closed file")
3590
3591 def test_readline_with_subclass_and_closed_attr_returns_string(self):
3592 class Closed(_io.StringIO):
3593 closed = True
3594
3595 closed = Closed("foo\nbar")
3596 self.assertEqual(closed.readline(), "foo\n")
3597
3598 def test_readline_with_non_int_raises_type_error(self):
3599 string_io = _io.StringIO("hello world")
3600 with self.assertRaises(TypeError):
3601 string_io.readline("not-int")
3602
3603 # CPython 3.6 does not do the __index__ call, whereas >= 3.7 does. Since we've
3604 # already implemented this functionality, we should test it.
3605 @pyro_only
3606 def test_readline_with_intlike_calls_dunder_index(self):
3607 class IntLike:
3608 def __index__(self):
3609 return 5
3610
3611 string_io = _io.StringIO("hello world")
3612 self.assertEqual(string_io.readline(IntLike()), "hello")
3613
3614 def test_readline_starts_at_internal_pos(self):
3615 string_io = _io.StringIO("hello world")
3616 string_io.seek(6)
3617 self.assertEqual(string_io.readline(), "world")
3618
3619 def test_readline_stops_after_passed_in_number_of_bytes(self):
3620 string_io = _io.StringIO("hello world\n")
3621 self.assertEqual(string_io.readline(5), "hello")
3622
3623 def test_readline_with_negative_size_full_line(self):
3624 string_io = _io.StringIO("hello world\n")
3625 self.assertEqual(string_io.readline(-5), "hello world\n")
3626
3627 def test_readline_with_overseek_returns_empty_string(self):
3628 string_io = _io.StringIO("hello world")
3629 string_io.seek(20)
3630 self.assertEqual(string_io.readline(), "")
3631
3632 def test_readline_translates_all_newlines_if_newline_is_none(self):
3633 string_io = _io.StringIO("foo\nbar\rbaz\r\n", None)
3634 self.assertEqual(string_io.readline(), "foo\n")
3635 self.assertEqual(string_io.readline(), "bar\n")
3636 self.assertEqual(string_io.readline(), "baz\n")
3637
3638 def test_readline_doesnt_translate_if_newline_is_empty(self):
3639 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "")
3640 self.assertEqual(string_io.readline(), "foo\n")
3641 self.assertEqual(string_io.readline(), "bar\r")
3642 self.assertEqual(string_io.readline(), "baz\r\n")
3643
3644 def test_readline_translates_line_feeds_if_newline_is_string(self):
3645 string_io = _io.StringIO("foo\nbar\rbaz\r\n", "\r\n")
3646 self.assertEqual(string_io.readline(), "foo\r\n")
3647 self.assertEqual(string_io.readline(), "bar\rbaz\r\r\n")
3648
3649 def test_readable_with_open_StringIO_returns_true(self):
3650 string_io = _io.StringIO()
3651 self.assertTrue(string_io.readable())
3652
3653 def test_readable_with_closed_StringIO_raises_value_error(self):
3654 string_io = _io.StringIO()
3655 string_io.close()
3656 with self.assertRaises(ValueError) as context:
3657 string_io.readable()
3658 self.assertRegex(str(context.exception), "I/O operation on closed file")
3659
3660 def test_seek_with_open_sets_position(self):
3661 string_io = _io.StringIO("foo\n")
3662 self.assertEqual(string_io.seek(2), 2)
3663 self.assertEqual(string_io.tell(), 2)
3664
3665 def test_seek_with_closed_raises_value_error(self):
3666 string_io = _io.StringIO()
3667 string_io.close()
3668 with self.assertRaises(ValueError) as context:
3669 string_io.seek(0)
3670 self.assertRegex(str(context.exception), "I/O operation on closed file")
3671
3672 def test_seek_with_subclass_and_closed_attr_sets_position(self):
3673 class Closed(_io.StringIO):
3674 closed = True
3675
3676 closed = Closed("foo\n")
3677 self.assertEqual(closed.seek(2), 2)
3678 self.assertEqual(closed.tell(), 2)
3679
3680 def test_seek_with_non_int_cookie_raises_type_error(self):
3681 string_io = _io.StringIO("hello world")
3682 with self.assertRaises(TypeError):
3683 string_io.seek("not-int", 0)
3684
3685 def test_seek_with_intlike_cookie_calls_dunder_index(self):
3686 class IntLike:
3687 def __index__(self):
3688 return 5
3689
3690 string_io = _io.StringIO("hello world")
3691 self.assertEqual(string_io.seek(IntLike()), 5)
3692 self.assertEqual(string_io.tell(), 5)
3693
3694 def test_seek_with_non_int_whence_raises_type_error(self):
3695 string_io = _io.StringIO("hello world")
3696 with self.assertRaises(TypeError):
3697 string_io.seek(0, "not-int")
3698
3699 def test_seek_accepts_index_covertible_whence(self):
3700 class IndexLike:
3701 def __index__(self):
3702 return 0
3703
3704 string_io = _io.StringIO("hello world")
3705 self.assertEqual(string_io.seek(1, IndexLike()), 1)
3706
3707 def test_seek_can_overseek(self):
3708 string_io = _io.StringIO("foo\n")
3709 self.assertEqual(string_io.seek(10), 10)
3710 self.assertEqual(string_io.tell(), 10)
3711
3712 def test_seek_with_whence_zero_and_negative_cookie_raises_value_error(self):
3713 string_io = _io.StringIO("foo\n")
3714 with self.assertRaises(ValueError) as context:
3715 string_io.seek(-10)
3716 self.assertRegex(str(context.exception), "Negative seek position -10")
3717
3718 def test_seek_with_whence_one_and_non_zero_cookie_raises_os_error(self):
3719 string_io = _io.StringIO("foo\n")
3720 with self.assertRaises(OSError) as context:
3721 string_io.seek(1, 1)
3722 self.assertRegex(str(context.exception), "Can't do nonzero cur-relative seeks")
3723
3724 def test_seek_with_whence_one_doesnt_change_pos(self):
3725 string_io = _io.StringIO("foo\n")
3726 self.assertEqual(string_io.seek(2), 2)
3727 self.assertEqual(string_io.seek(0, 1), 2)
3728 self.assertEqual(string_io.tell(), 2)
3729
3730 def test_seek_with_whence_two_and_non_zero_cookie_raises_os_error(self):
3731 string_io = _io.StringIO("foo\n")
3732 with self.assertRaises(OSError) as context:
3733 string_io.seek(1, 1)
3734 self.assertRegex(str(context.exception), "Can't do nonzero cur-relative seeks")
3735
3736 def test_seek_with_whence_two_sets_pos_to_end(self):
3737 string_io = _io.StringIO("foo\n")
3738 self.assertEqual(string_io.seek(2), 2)
3739 self.assertEqual(string_io.seek(0, 2), 4)
3740 self.assertEqual(string_io.tell(), 4)
3741
3742 def test_seek_with_invalid_whence_raises_value_error(self):
3743 string_io = _io.StringIO("foo\n")
3744 with self.assertRaises(ValueError) as context:
3745 string_io.seek(1, 3)
3746 self.assertEqual(
3747 str(context.exception), "Invalid whence (3, should be 0, 1 or 2)"
3748 )
3749
3750 def test_seek_with_bigint_whence_raises_overflow_error(self):
3751 string_io = _io.StringIO("foo\n")
3752 with self.assertRaisesRegex(
3753 OverflowError, r"Python int too large to convert to C .*"
3754 ):
3755 string_io.seek(1, 2 ** 65)
3756
3757 def test_seekable_with_open_StringIO_returns_true(self):
3758 string_io = _io.StringIO()
3759 self.assertTrue(string_io.seekable())
3760
3761 def test_seekable_with_closed_StringIO_raises_value_error(self):
3762 string_io = _io.StringIO()
3763 string_io.close()
3764 with self.assertRaises(ValueError) as context:
3765 string_io.seekable()
3766 self.assertRegex(str(context.exception), "I/O operation on closed file")
3767
3768 def test_subclass_with_closed_attribute_is_not_closed_for_StringIO(self):
3769 class Closed(_io.StringIO):
3770 closed = True
3771
3772 c = Closed()
3773 self.assertEqual(c.write("foobar"), 6)
3774 self.assertEqual(c.seek(0), 0)
3775 with self.assertRaises(ValueError) as context:
3776 # Since readlines is inherited from `_IOBase which checks for the
3777 # closed property, this method call should raise a ValueError
3778 c.readlines()
3779 self.assertRegex(str(context.exception), "I/O operation on closed file")
3780
3781 def test_supports_arbitrary_attributes(self):
3782 string_io = _io.StringIO()
3783 string_io.here_comes_the_sun = "and I say it's alright"
3784 self.assertEqual(string_io.here_comes_the_sun, "and I say it's alright")
3785
3786 def test_tell_with_open_returns_position(self):
3787 string_io = _io.StringIO()
3788 self.assertEqual(string_io.seek(2), 2)
3789 self.assertEqual(string_io.tell(), 2)
3790
3791 def test_tell_with_closed_raises_value_error(self):
3792 string_io = _io.StringIO()
3793 string_io.close()
3794 with self.assertRaises(ValueError) as context:
3795 string_io.tell()
3796 self.assertRegex(str(context.exception), "I/O operation on closed file")
3797
3798 def test_tell_with_subclass_and_closed_attr_returns_string(self):
3799 class Closed(_io.StringIO):
3800 closed = True
3801
3802 closed = Closed("foo\nbar")
3803 self.assertEqual(closed.tell(), 0)
3804
3805 def test_truncate_with_open_truncates_from_pos(self):
3806 string_io = _io.StringIO("hello world")
3807 string_io.seek(5)
3808 self.assertEqual(string_io.truncate(), 5)
3809 self.assertEqual(string_io.getvalue(), "hello")
3810
3811 def test_truncate_with_closed_raises_value_error(self):
3812 string_io = _io.StringIO()
3813 string_io.close()
3814 with self.assertRaises(ValueError) as context:
3815 string_io.truncate()
3816 self.assertRegex(str(context.exception), "I/O operation on closed file")
3817
3818 def test_truncate_with_subclass_and_closed_attr_returns_string(self):
3819 class Closed(_io.StringIO):
3820 closed = True
3821
3822 closed = Closed("hello world")
3823 closed.seek(5)
3824 self.assertEqual(closed.truncate(), 5)
3825 self.assertEqual(closed.getvalue(), "hello")
3826
3827 def test_truncate_with_non_int_raises_type_error(self):
3828 string_io = _io.StringIO("hello world")
3829 with self.assertRaises(TypeError):
3830 string_io.truncate("not-int")
3831
3832 # CPython 3.6 does not do the __index__ call, whereas >= 3.7 does. Since we've
3833 # already implemented this functionality, we should test it.
3834 @pyro_only
3835 def test_truncate_with_intlike_calls_dunder_index(self):
3836 class IntLike:
3837 def __index__(self):
3838 return 5
3839
3840 string_io = _io.StringIO("hello world")
3841 self.assertEqual(string_io.truncate(IntLike()), 5)
3842 self.assertEqual(string_io.getvalue(), "hello")
3843
3844 def test_truncate_with_negative_int_raises_value_error(self):
3845 string_io = _io.StringIO("hello world")
3846 with self.assertRaises(ValueError) as context:
3847 string_io.truncate(-1)
3848 self.assertRegex(str(context.exception), "Negative size value -1")
3849
3850 def test_truncate_with_size_past_end_of_buffer_returns_size(self):
3851 string_io = _io.StringIO("hello")
3852 self.assertEqual(string_io.truncate(10), 10)
3853 self.assertEqual(string_io.getvalue(), "hello")
3854
3855 def test_write_with_open_writes_to_object(self):
3856 string_io = _io.StringIO()
3857 self.assertEqual(string_io.write("foobar"), 6)
3858 self.assertEqual(string_io.getvalue(), "foobar")
3859
3860 def test_write_with_closed_raises_value_error(self):
3861 strio = _io.StringIO()
3862 self.assertEqual(strio.write("foobar"), 6)
3863 strio.close()
3864 with self.assertRaises(ValueError) as context:
3865 strio.write("foo")
3866 self.assertRegex(str(context.exception), "I/O operation on closed file")
3867
3868 def test_write_with_subclass_and_closed_attr_sets_position(self):
3869 class Closed(_io.StringIO):
3870 closed = True
3871
3872 closed = Closed()
3873 self.assertEqual(closed.write("foobar"), 6)
3874 self.assertEqual(closed.getvalue(), "foobar")
3875
3876 def test_write_with_non_string_raises_type_error(self):
3877 strio = _io.StringIO()
3878 with self.assertRaises(TypeError):
3879 strio.write(1)
3880
3881 def test_write_starts_writing_at_current_position(self):
3882 string_io = _io.StringIO("hello world")
3883 string_io.seek(5)
3884 self.assertEqual(string_io.write("\n"), 1)
3885 self.assertEqual(string_io.getvalue(), "hello\nworld")
3886
3887 def test_write_with_empty_string_does_nothing(self):
3888 string_io = _io.StringIO("hello world")
3889 string_io.seek(5)
3890 self.assertEqual(string_io.write(""), 0)
3891 self.assertEqual(string_io.getvalue(), "hello world")
3892
3893 def test_write_with_larger_string_than_buffer_extends_buffer(self):
3894 string_io = _io.StringIO("hello")
3895 self.assertEqual(string_io.write("hello world"), 11)
3896 self.assertEqual(string_io.getvalue(), "hello world")
3897
3898 def test_write_with_overseek_pads_end_of_buffer_to_position_with_zeros(self):
3899 string_io = _io.StringIO("hello")
3900 string_io.seek(7)
3901 self.assertEqual(string_io.write("world"), 5)
3902 self.assertEqual(string_io.getvalue(), "hello\0\0world")
3903
3904 def test_write_with_write_translate_returns_original_size_of_string(self):
3905 string_io = _io.StringIO(newline=None)
3906 self.assertEqual(string_io.write("baz\r\n"), 5)
3907 self.assertEqual(string_io.getvalue(), "baz\n")
3908
3909 def test_write_with_newline_none_translates_newlines_to_line_feed(self):
3910 string_io = _io.StringIO(newline=None)
3911 self.assertEqual(string_io.write("foo\n"), 4)
3912 self.assertEqual(string_io.write("bar\r"), 4)
3913 self.assertEqual(string_io.write("baz\r\n"), 5)
3914 self.assertEqual(string_io.getvalue(), "foo\nbar\nbaz\n")
3915
3916 def test_write_with_newline_empty_doesnt_do_any_translations(self):
3917 string_io = _io.StringIO(newline="")
3918 self.assertEqual(string_io.write("foo\n"), 4)
3919 self.assertEqual(string_io.write("bar\r"), 4)
3920 self.assertEqual(string_io.write("baz\r\n"), 5)
3921 self.assertEqual(string_io.getvalue(), "foo\nbar\rbaz\r\n")
3922
3923 def test_write_with_string_newline_translates_line_feed_to_string(self):
3924 string_io = _io.StringIO(newline="\r\n")
3925 self.assertEqual(string_io.write("foo\n"), 4)
3926 self.assertEqual(string_io.write("bar\r"), 4)
3927 self.assertEqual(string_io.write("baz\r\n"), 5)
3928 self.assertEqual(string_io.getvalue(), "foo\r\nbar\rbaz\r\r\n")
3929
3930 def test_write_with_newline_none_stores_seen_newlines(self):
3931 string_io = _io.StringIO(newline=None)
3932 self.assertEqual(string_io.newlines, None)
3933 self.assertEqual(string_io.write("foo\n"), 4)
3934 self.assertEqual(string_io.newlines, "\n")
3935 self.assertEqual(string_io.write("bar\r"), 4)
3936 self.assertEqual(string_io.newlines, ("\r", "\n"))
3937 self.assertEqual(string_io.write("baz\r\n"), 5)
3938 self.assertEqual(string_io.newlines, ("\r", "\n", "\r\n"))
3939 self.assertEqual(string_io.getvalue(), "foo\nbar\nbaz\n")
3940
3941 def test_writable_with_open_StringIO_returns_true(self):
3942 string_io = _io.StringIO()
3943 self.assertTrue(string_io.writable())
3944
3945 def test_writable_with_closed_StringIO_raises_value_error(self):
3946 string_io = _io.StringIO()
3947 string_io.close()
3948 with self.assertRaises(ValueError) as context:
3949 string_io.writable()
3950 self.assertRegex(str(context.exception), "I/O operation on closed file")
3951
3952
3953if __name__ == "__main__":
3954 unittest.main()