diff --git a/Lib/test/test_zipfile64.py b/Lib/test/test_zipfile64.py index 7d802d59849ce1f..93664902e186fbd 100644 --- a/Lib/test/test_zipfile64.py +++ b/Lib/test/test_zipfile64.py @@ -17,6 +17,7 @@ import sys import unittest.mock as mock +from contextlib import contextmanager from tempfile import TemporaryFile from test.support import os_helper @@ -100,9 +101,23 @@ def setUp(self): # It will contain enough copies of self.data to reach about 8 GiB. self.datacount = 8*1024**3 // len(self.data) - # memory usage should not exceed 10 MiB + # Memory usage should not exceed 10 MiB during repacking. + # This empirical threshold ensures that the internal processing + # like signature scanning, compressed block end tracing, and + # data copying are properly buffered without loading the entire + # large file into memory. self.allowed_memory = 10*1024**2 + @contextmanager + def assert_memory_usage(self, threshold): + tracemalloc.start() + try: + yield + finally: + current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + self.assertLess(peak, threshold) + def _write_large_file(self, fh): next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL for num in range(self.datacount): @@ -121,26 +136,21 @@ def test_strip_removed_large_file(self): # Try the temp file. If we do TESTFN2, then it hogs # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file(f) + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with zipfile.ZipFile(f, 'w') as zh: + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(file, data) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(file1) + zh.repack() + self.assertIsNone(zh.testzip()) + self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with zipfile.ZipFile(f, 'w') as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - zh.writestr(file, data) - - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - zh.repack() - self.assertIsNone(zh.testzip()) def test_strip_removed_file_before_large_file(self): """Should move the physical data of a large file positioned after a @@ -148,26 +158,21 @@ def test_strip_removed_file_before_large_file(self): # Try the temp file. If we do TESTFN2, then it hogs # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_file_before_large_file(f) + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with zipfile.ZipFile(f, 'w') as zh: + zh.writestr(file, data) + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(file) + zh.repack() + self.assertIsNone(zh.testzip()) + self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_file_before_large_file(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with zipfile.ZipFile(f, 'w') as zh: - zh.writestr(file, data) - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file) - zh.repack() - self.assertIsNone(zh.testzip()) def test_strip_removed_large_file_with_dd(self): """Should scan for the data descriptor of a removed large file without @@ -175,60 +180,49 @@ def test_strip_removed_large_file_with_dd(self): # Try the temp file. If we do TESTFN2, then it hogs # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file_with_dd(f) + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with zipfile.ZipFile(Unseekable(f), 'w') as zh: + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(file, data) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(file1) + zh.repack() + self.assertIsNone(zh.testzip()) + self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file_with_dd(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with zipfile.ZipFile(Unseekable(f), 'w') as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - zh.writestr(file, data) - - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - zh.repack() - self.assertIsNone(zh.testzip()) def test_strip_removed_large_file_with_dd_no_sig(self): """Should scan for the data descriptor (without signature) of a removed large file without causing a memory issue.""" # Reduce data scale for this test, as it's especially slow... self.datacount = 30*1024**2 // len(self.data) - self.allowed_memory = 200*1024 # Try the temp file. If we do TESTFN2, then it hogs # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file_with_dd_no_sig(f) - self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file_with_dd_no_sig(self, f): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): - with zipfile.ZipFile(Unseekable(f), 'w') as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - zh.writestr(file, data) + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + with zipfile.ZipFile(Unseekable(f), 'w') as zh: + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(file, data) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(file1) + # strict_descriptor=False to scan the unsigned data descriptor + # (scanning is disabled under the strict_descriptor=True default) + zh.repack(strict_descriptor=False) + self.assertIsNone(zh.testzip()) - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - # strict_descriptor=False to scan the unsigned data descriptor - # (scanning is disabled under the strict_descriptor=True default) - zh.repack(strict_descriptor=False) - self.assertIsNone(zh.testzip()) + self.assertFalse(f.closed) @requires_zlib() def test_strip_removed_large_file_with_dd_no_sig_by_decompression(self): @@ -237,30 +231,25 @@ def test_strip_removed_large_file_with_dd_no_sig_by_decompression(self): # Try the temp file. If we do TESTFN2, then it hogs # gigabytes of disk space for the duration of the test. with TemporaryFile() as f: - tracemalloc.start() - self._test_strip_removed_large_file_with_dd_no_sig_by_decompression( - f, zipfile.ZIP_DEFLATED) - self.assertFalse(f.closed) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - self.assertLess(peak, self.allowed_memory) - - def _test_strip_removed_large_file_with_dd_no_sig_by_decompression(self, f, method): - file = 'file.txt' - file1 = 'largefile.txt' - data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' - with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): - with zipfile.ZipFile(Unseekable(f), 'w', compression=method) as zh: - with zh.open(file1, 'w', force_zip64=True) as fh: - self._write_large_file(fh) - zh.writestr(file, data) + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + with zipfile.ZipFile(Unseekable(f), 'w', + compression=zipfile.ZIP_DEFLATED) as zh: + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(file, data) + + with self.assert_memory_usage(self.allowed_memory), \ + zipfile.ZipFile(f, 'a') as zh: + zh.remove(file1) + # strict_descriptor=False to detect the unsigned data descriptor + # (scanning is disabled under the strict_descriptor=True default) + zh.repack(strict_descriptor=False) + self.assertIsNone(zh.testzip()) - with zipfile.ZipFile(f, 'a') as zh: - zh.remove(file1) - # strict_descriptor=False to detect the unsigned data descriptor - # (scanning is disabled under the strict_descriptor=True default) - zh.repack(strict_descriptor=False) - self.assertIsNone(zh.testzip()) + self.assertFalse(f.closed) class OtherTests(unittest.TestCase):