1
1
import os
2
+ import sys
2
3
import threading
3
4
import time
4
5
import traceback
@@ -84,11 +85,13 @@ def run(self):
84
85
def run_test (self , test_index ):
85
86
test = self .run_instance .tests [test_index ]
86
87
try :
87
- self .run_instance .execute_test (test )
88
+ execute_test (test , self .run_instance .lit_config ,
89
+ self .run_instance .parallelism_semaphores )
88
90
except KeyboardInterrupt :
89
91
# This is a sad hack. Unfortunately subprocess goes
90
92
# bonkers with ctrl-c and we start forking merrily.
91
93
print ('\n Ctrl-C detected, goodbye.' )
94
+ sys .stdout .flush ()
92
95
os .kill (0 ,9 )
93
96
self .consumer .update (test_index , test )
94
97
@@ -167,6 +170,44 @@ def update(self, test):
167
170
def handleFailures (provider , consumer , maxFailures ):
168
171
consumer .display = _Display (consumer .display , provider , maxFailures )
169
172
173
+ def execute_test (test , lit_config , parallelism_semaphores ):
174
+ """Execute one test"""
175
+ pg = test .config .parallelism_group
176
+ if callable (pg ):
177
+ pg = pg (test )
178
+
179
+ result = None
180
+ semaphore = None
181
+ try :
182
+ if pg :
183
+ semaphore = parallelism_semaphores [pg ]
184
+ if semaphore :
185
+ semaphore .acquire ()
186
+ start_time = time .time ()
187
+ result = test .config .test_format .execute (test , lit_config )
188
+ # Support deprecated result from execute() which returned the result
189
+ # code and additional output as a tuple.
190
+ if isinstance (result , tuple ):
191
+ code , output = result
192
+ result = lit .Test .Result (code , output )
193
+ elif not isinstance (result , lit .Test .Result ):
194
+ raise ValueError ("unexpected result from test execution" )
195
+ result .elapsed = time .time () - start_time
196
+ except KeyboardInterrupt :
197
+ raise
198
+ except :
199
+ if lit_config .debug :
200
+ raise
201
+ output = 'Exception during script execution:\n '
202
+ output += traceback .format_exc ()
203
+ output += '\n '
204
+ result = lit .Test .Result (lit .Test .UNRESOLVED , output )
205
+ finally :
206
+ if semaphore :
207
+ semaphore .release ()
208
+
209
+ test .setResult (result )
210
+
170
211
class Run (object ):
171
212
"""
172
213
This class represents a concrete, configured testing run.
@@ -177,42 +218,10 @@ def __init__(self, lit_config, tests):
177
218
self .tests = tests
178
219
179
220
def execute_test (self , test ):
180
- pg = test .config .parallelism_group
181
- if callable (pg ): pg = pg (test )
182
-
183
- result = None
184
- semaphore = None
185
- try :
186
- if pg : semaphore = self .parallelism_semaphores [pg ]
187
- if semaphore : semaphore .acquire ()
188
- start_time = time .time ()
189
- result = test .config .test_format .execute (test , self .lit_config )
190
-
191
- # Support deprecated result from execute() which returned the result
192
- # code and additional output as a tuple.
193
- if isinstance (result , tuple ):
194
- code , output = result
195
- result = lit .Test .Result (code , output )
196
- elif not isinstance (result , lit .Test .Result ):
197
- raise ValueError ("unexpected result from test execution" )
198
-
199
- result .elapsed = time .time () - start_time
200
- except KeyboardInterrupt :
201
- raise
202
- except :
203
- if self .lit_config .debug :
204
- raise
205
- output = 'Exception during script execution:\n '
206
- output += traceback .format_exc ()
207
- output += '\n '
208
- result = lit .Test .Result (lit .Test .UNRESOLVED , output )
209
- finally :
210
- if semaphore : semaphore .release ()
211
-
212
- test .setResult (result )
221
+ return execute_test (test , self .lit_config , self .parallelism_semaphores )
213
222
214
223
def execute_tests (self , display , jobs , max_time = None ,
215
- use_processes = False ):
224
+ execution_strategy = None ):
216
225
"""
217
226
execute_tests(display, jobs, [max_time])
218
227
@@ -234,6 +243,14 @@ def execute_tests(self, display, jobs, max_time=None,
234
243
be given an UNRESOLVED result.
235
244
"""
236
245
246
+ if execution_strategy == 'PROCESS_POOL' :
247
+ self .execute_tests_with_mp_pool (display , jobs , max_time )
248
+ return
249
+ # FIXME: Standardize on the PROCESS_POOL execution strategy and remove
250
+ # the other two strategies.
251
+
252
+ use_processes = execution_strategy == 'PROCESSES'
253
+
237
254
# Choose the appropriate parallel execution implementation.
238
255
consumer = None
239
256
if jobs != 1 and use_processes and multiprocessing :
@@ -263,8 +280,8 @@ def execute_tests(self, display, jobs, max_time=None,
263
280
provider = TestProvider (queue_impl , canceled_flag )
264
281
handleFailures (provider , consumer , self .lit_config .maxFailures )
265
282
266
- # Queue the tests outside the main thread because we can't guarantee
267
- # that we can put() all the tests without blocking:
283
+ # Putting tasks into the threading or multiprocessing Queue may block,
284
+ # so do it in a separate thread.
268
285
# https://docs.python.org/2/library/multiprocessing.html
269
286
# e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
270
287
# without taking any out.
@@ -317,3 +334,111 @@ def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
317
334
# Wait for all the tasks to complete.
318
335
for t in tasks :
319
336
t .join ()
337
+
338
+ def execute_tests_with_mp_pool (self , display , jobs , max_time = None ):
339
+ # Don't do anything if we aren't going to run any tests.
340
+ if not self .tests or jobs == 0 :
341
+ return
342
+
343
+ # Set up semaphores to limit parallelism of certain classes of tests.
344
+ # For example, some ASan tests require lots of virtual memory and run
345
+ # faster with less parallelism on OS X.
346
+ self .parallelism_semaphores = \
347
+ {k : multiprocessing .Semaphore (v ) for k , v in
348
+ self .lit_config .parallelism_groups .items ()}
349
+
350
+ # Save the display object on the runner so that we can update it from
351
+ # our task completion callback.
352
+ self .display = display
353
+
354
+ # Start a process pool. Copy over the data shared between all test runs.
355
+ pool = multiprocessing .Pool (jobs , worker_initializer ,
356
+ (self .lit_config ,
357
+ self .parallelism_semaphores ))
358
+
359
+ # Install a console-control signal handler on Windows.
360
+ if win32api is not None :
361
+ def console_ctrl_handler (type ):
362
+ print "Ctr-C received, terminating"
363
+ pool .terminate ()
364
+ pool .join ()
365
+ os .kill (0 ,9 )
366
+ return True
367
+ win32api .SetConsoleCtrlHandler (console_ctrl_handler , True )
368
+
369
+ # FIXME: Implement max_time using .wait() timeout argument and a
370
+ # deadline.
371
+
372
+ try :
373
+ async_results = [pool .apply_async (worker_run_one_test ,
374
+ args = (test_index , test ),
375
+ callback = self .consume_test_result )
376
+ for test_index , test in enumerate (self .tests )]
377
+
378
+ # Wait for all results to come in. The callback that runs in the
379
+ # parent process will update the display.
380
+ for a in async_results :
381
+ a .wait ()
382
+ if not a .successful ():
383
+ a .get () # Exceptions raised here come from the worker.
384
+ finally :
385
+ pool .terminate ()
386
+ pool .join ()
387
+
388
+ # Mark any tests that weren't run as UNRESOLVED.
389
+ for test in self .tests :
390
+ if test .result is None :
391
+ test .setResult (lit .Test .Result (lit .Test .UNRESOLVED , '' , 0.0 ))
392
+
393
+ def consume_test_result (self , pool_result ):
394
+ """Test completion callback for worker_run_one_test
395
+
396
+ Updates the test result status in the parent process. Each task in the
397
+ pool returns the test index and the result, and we use the index to look
398
+ up the original test object. Also updates the progress bar as tasks
399
+ complete.
400
+ """
401
+ (test_index , test_with_result ) = pool_result
402
+ # Update the parent process copy of the test. This includes the result,
403
+ # XFAILS, REQUIRES, and UNSUPPORTED statuses.
404
+ assert self .tests [test_index ].file_path == test_with_result .file_path , \
405
+ "parent and child disagree on test path"
406
+ self .tests [test_index ] = test_with_result
407
+ self .display .update (test_with_result )
408
+
409
+ child_lit_config = None
410
+ child_parallelism_semaphores = None
411
+
412
+ def worker_initializer (lit_config , parallelism_semaphores ):
413
+ """Copy expensive repeated data into worker processes"""
414
+ global child_lit_config
415
+ child_lit_config = lit_config
416
+ global child_parallelism_semaphores
417
+ child_parallelism_semaphores = parallelism_semaphores
418
+
419
+ def worker_run_one_test (test_index , test ):
420
+ """Run one test in a multiprocessing.Pool
421
+
422
+ Side effects in this function and functions it calls are not visible in the
423
+ main lit process.
424
+
425
+ Arguments and results of this function are pickled, so they should be cheap
426
+ to copy. For efficiency, we copy all data needed to execute all tests into
427
+ each worker and store it in the child_* global variables. This reduces the
428
+ cost of each task.
429
+
430
+ Returns an index and a Result, which the parent process uses to update
431
+ the display.
432
+ """
433
+ try :
434
+ execute_test (test , child_lit_config , child_parallelism_semaphores )
435
+ return (test_index , test )
436
+ except KeyboardInterrupt as e :
437
+ # This is a sad hack. Unfortunately subprocess goes
438
+ # bonkers with ctrl-c and we start forking merrily.
439
+ print ('\n Ctrl-C detected, goodbye.' )
440
+ traceback .print_exc ()
441
+ sys .stdout .flush ()
442
+ os .kill (0 ,9 )
443
+ except :
444
+ traceback .print_exc ()
0 commit comments