1717import sys
1818import unittest .mock as mock
1919
20+ from contextlib import contextmanager
2021from tempfile import TemporaryFile
2122
2223from test .support import os_helper
@@ -107,6 +108,16 @@ def setUp(self):
107108 # large file into memory.
108109 self .allowed_memory = 10 * 1024 ** 2
109110
111+ @contextmanager
112+ def assert_memory_usage (self , threshold ):
113+ tracemalloc .start ()
114+ try :
115+ yield
116+ finally :
117+ current , peak = tracemalloc .get_traced_memory ()
118+ tracemalloc .stop ()
119+ self .assertLess (peak , threshold )
120+
110121 def _write_large_file (self , fh ):
111122 next_time = time .monotonic () + _PRINT_WORKING_MSG_INTERVAL
112123 for num in range (self .datacount ):
@@ -125,80 +136,65 @@ def test_strip_removed_large_file(self):
125136 # Try the temp file. If we do TESTFN2, then it hogs
126137 # gigabytes of disk space for the duration of the test.
127138 with TemporaryFile () as f :
128- tracemalloc .start ()
129- self ._test_strip_removed_large_file (f )
139+ file = 'file.txt'
140+ file1 = 'largefile.txt'
141+ data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
142+ with zipfile .ZipFile (f , 'w' ) as zh :
143+ with zh .open (file1 , 'w' , force_zip64 = True ) as fh :
144+ self ._write_large_file (fh )
145+ zh .writestr (file , data )
146+
147+ with self .assert_memory_usage (self .allowed_memory ), \
148+ zipfile .ZipFile (f , 'a' ) as zh :
149+ zh .remove (file1 )
150+ zh .repack ()
151+ self .assertIsNone (zh .testzip ())
152+
130153 self .assertFalse (f .closed )
131- current , peak = tracemalloc .get_traced_memory ()
132- tracemalloc .stop ()
133- self .assertLess (peak , self .allowed_memory )
134-
135- def _test_strip_removed_large_file (self , f ):
136- file = 'file.txt'
137- file1 = 'largefile.txt'
138- data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
139- with zipfile .ZipFile (f , 'w' ) as zh :
140- with zh .open (file1 , 'w' , force_zip64 = True ) as fh :
141- self ._write_large_file (fh )
142- zh .writestr (file , data )
143-
144- with zipfile .ZipFile (f , 'a' ) as zh :
145- zh .remove (file1 )
146- zh .repack ()
147- self .assertIsNone (zh .testzip ())
148154
149155 def test_strip_removed_file_before_large_file (self ):
150156 """Should move the physical data of a large file positioned after a
151157 removed file without causing a memory issue."""
152158 # Try the temp file. If we do TESTFN2, then it hogs
153159 # gigabytes of disk space for the duration of the test.
154160 with TemporaryFile () as f :
155- tracemalloc .start ()
156- self ._test_strip_removed_file_before_large_file (f )
161+ file = 'file.txt'
162+ file1 = 'largefile.txt'
163+ data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
164+ with zipfile .ZipFile (f , 'w' ) as zh :
165+ zh .writestr (file , data )
166+ with zh .open (file1 , 'w' , force_zip64 = True ) as fh :
167+ self ._write_large_file (fh )
168+
169+ with self .assert_memory_usage (self .allowed_memory ), \
170+ zipfile .ZipFile (f , 'a' ) as zh :
171+ zh .remove (file )
172+ zh .repack ()
173+ self .assertIsNone (zh .testzip ())
174+
157175 self .assertFalse (f .closed )
158- current , peak = tracemalloc .get_traced_memory ()
159- tracemalloc .stop ()
160- self .assertLess (peak , self .allowed_memory )
161-
162- def _test_strip_removed_file_before_large_file (self , f ):
163- file = 'file.txt'
164- file1 = 'largefile.txt'
165- data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
166- with zipfile .ZipFile (f , 'w' ) as zh :
167- zh .writestr (file , data )
168- with zh .open (file1 , 'w' , force_zip64 = True ) as fh :
169- self ._write_large_file (fh )
170-
171- with zipfile .ZipFile (f , 'a' ) as zh :
172- zh .remove (file )
173- zh .repack ()
174- self .assertIsNone (zh .testzip ())
175176
176177 def test_strip_removed_large_file_with_dd (self ):
177178 """Should scan for the data descriptor of a removed large file without
178179 causing a memory issue."""
179180 # Try the temp file. If we do TESTFN2, then it hogs
180181 # gigabytes of disk space for the duration of the test.
181182 with TemporaryFile () as f :
182- tracemalloc .start ()
183- self ._test_strip_removed_large_file_with_dd (f )
183+ file = 'file.txt'
184+ file1 = 'largefile.txt'
185+ data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
186+ with zipfile .ZipFile (Unseekable (f ), 'w' ) as zh :
187+ with zh .open (file1 , 'w' , force_zip64 = True ) as fh :
188+ self ._write_large_file (fh )
189+ zh .writestr (file , data )
190+
191+ with self .assert_memory_usage (self .allowed_memory ), \
192+ zipfile .ZipFile (f , 'a' ) as zh :
193+ zh .remove (file1 )
194+ zh .repack ()
195+ self .assertIsNone (zh .testzip ())
196+
184197 self .assertFalse (f .closed )
185- current , peak = tracemalloc .get_traced_memory ()
186- tracemalloc .stop ()
187- self .assertLess (peak , self .allowed_memory )
188-
189- def _test_strip_removed_large_file_with_dd (self , f ):
190- file = 'file.txt'
191- file1 = 'largefile.txt'
192- data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
193- with zipfile .ZipFile (Unseekable (f ), 'w' ) as zh :
194- with zh .open (file1 , 'w' , force_zip64 = True ) as fh :
195- self ._write_large_file (fh )
196- zh .writestr (file , data )
197-
198- with zipfile .ZipFile (f , 'a' ) as zh :
199- zh .remove (file1 )
200- zh .repack ()
201- self .assertIsNone (zh .testzip ())
202198
203199 def test_strip_removed_large_file_with_dd_no_sig (self ):
204200 """Should scan for the data descriptor (without signature) of a removed
@@ -209,29 +205,24 @@ def test_strip_removed_large_file_with_dd_no_sig(self):
209205 # Try the temp file. If we do TESTFN2, then it hogs
210206 # gigabytes of disk space for the duration of the test.
211207 with TemporaryFile () as f :
212- tracemalloc . start ()
213- self . _test_strip_removed_large_file_with_dd_no_sig ( f )
214- self . assertFalse ( f . closed )
215- current , peak = tracemalloc . get_traced_memory ()
216- tracemalloc . stop ()
217- self . assertLess ( peak , self . allowed_memory )
218-
219- def _test_strip_removed_large_file_with_dd_no_sig ( self , f ):
220- file = 'file.txt'
221- file1 = 'largefile.txt'
222- data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
223- with mock . patch ( 'zipfile.struct.pack' , side_effect = struct_pack_no_dd_sig ):
224- with zipfile . ZipFile ( Unseekable ( f ), 'w' ) as zh :
225- with zh . open ( file1 , 'w' , force_zip64 = True ) as fh :
226- self . _write_large_file ( fh )
227- zh .writestr ( file , data )
208+ file = 'file.txt'
209+ file1 = 'largefile.txt'
210+ data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
211+ with mock . patch ( 'zipfile.struct.pack' , side_effect = struct_pack_no_dd_sig ):
212+ with zipfile . ZipFile ( Unseekable ( f ), 'w' ) as zh :
213+ with zh . open ( file1 , 'w' , force_zip64 = True ) as fh :
214+ self . _write_large_file ( fh )
215+ zh . writestr ( file , data )
216+
217+ with self . assert_memory_usage ( self . allowed_memory ), \
218+ zipfile . ZipFile ( f , 'a' ) as zh :
219+ zh . remove ( file1 )
220+ # strict_descriptor=False to scan the unsigned data descriptor
221+ # (scanning is disabled under the strict_descriptor =True default)
222+ zh . repack ( strict_descriptor = False )
223+ self . assertIsNone ( zh .testzip () )
228224
229- with zipfile .ZipFile (f , 'a' ) as zh :
230- zh .remove (file1 )
231- # strict_descriptor=False to scan the unsigned data descriptor
232- # (scanning is disabled under the strict_descriptor=True default)
233- zh .repack (strict_descriptor = False )
234- self .assertIsNone (zh .testzip ())
225+ self .assertFalse (f .closed )
235226
236227 @requires_zlib ()
237228 def test_strip_removed_large_file_with_dd_no_sig_by_decompression (self ):
@@ -240,30 +231,25 @@ def test_strip_removed_large_file_with_dd_no_sig_by_decompression(self):
240231 # Try the temp file. If we do TESTFN2, then it hogs
241232 # gigabytes of disk space for the duration of the test.
242233 with TemporaryFile () as f :
243- tracemalloc . start ()
244- self . _test_strip_removed_large_file_with_dd_no_sig_by_decompression (
245- f , zipfile . ZIP_DEFLATED )
246- self . assertFalse ( f . closed )
247- current , peak = tracemalloc . get_traced_memory ()
248- tracemalloc . stop ()
249- self . assertLess ( peak , self . allowed_memory )
250-
251- def _test_strip_removed_large_file_with_dd_no_sig_by_decompression ( self , f , method ):
252- file = 'file.txt'
253- file1 = 'largefile.txt'
254- data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
255- with mock . patch ( 'zipfile.struct.pack' , side_effect = struct_pack_no_dd_sig ):
256- with zipfile . ZipFile ( Unseekable ( f ), 'w' , compression = method ) as zh :
257- with zh . open ( file1 , 'w' , force_zip64 = True ) as fh :
258- self . _write_large_file ( fh )
259- zh .writestr ( file , data )
234+ file = 'file.txt'
235+ file1 = 'largefile.txt'
236+ data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
237+ with mock . patch ( 'zipfile.struct.pack' , side_effect = struct_pack_no_dd_sig ):
238+ with zipfile . ZipFile ( Unseekable ( f ), 'w' ,
239+ compression = zipfile . ZIP_DEFLATED ) as zh :
240+ with zh . open ( file1 , 'w' , force_zip64 = True ) as fh :
241+ self . _write_large_file ( fh )
242+ zh . writestr ( file , data )
243+
244+ with self . assert_memory_usage ( self . allowed_memory ), \
245+ zipfile . ZipFile ( f , 'a' ) as zh :
246+ zh . remove ( file1 )
247+ # strict_descriptor=False to detect the unsigned data descriptor
248+ # (scanning is disabled under the strict_descriptor =True default)
249+ zh . repack ( strict_descriptor = False )
250+ self . assertIsNone ( zh .testzip () )
260251
261- with zipfile .ZipFile (f , 'a' ) as zh :
262- zh .remove (file1 )
263- # strict_descriptor=False to detect the unsigned data descriptor
264- # (scanning is disabled under the strict_descriptor=True default)
265- zh .repack (strict_descriptor = False )
266- self .assertIsNone (zh .testzip ())
252+ self .assertFalse (f .closed )
267253
268254
269255class OtherTests (unittest .TestCase ):
0 commit comments