Skip to content

Commit 1b3c146

Browse files
committedApr 5, 2017
[lit] Use process pools for test execution by default
Summary: This drastically reduces lit test execution startup time on Windows. Our previous strategy was to manually create one Process per job and manage the worker pool ourselves. Instead, let's use the worker pool provided by multiprocessing. multiprocessing.Pool(jobs) returns almost immediately, and initializes the appropriate number of workers, so they can all start executing tests immediately. This avoids the ramp-up period that the old implementation suffers from. This appears to speed up small test runs. Here are some timings of the llvm-readobj tests on Windows using the various execution strategies: # multiprocessing.Pool: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-process-pool |& grep real: ; done real: 0m1.156s real: 0m1.078s real: 0m1.094s # multiprocessing.Process: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-processes |& grep real: ; done real: 0m6.062s real: 0m5.860s real: 0m5.984s # threading.Thread: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-threads |& grep real: ; done real: 0m9.438s real: 0m10.765s real: 0m11.079s I kept the old code to launch processes in case this change doesn't work on all platforms that LLVM supports, but at some point I would like to remove both the threading and old multiprocessing execution strategies. Reviewers: modocache, rafael Subscribers: llvm-commits Differential Revision: https://reviews.llvm.org/D31677 llvm-svn: 299560
1 parent 42e0bb5 commit 1b3c146

File tree

2 files changed

+173
-42
lines changed

2 files changed

+173
-42
lines changed
 

‎llvm/utils/lit/lit/main.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,15 @@ def main_with_tmp(builtinParameters):
278278
debug_group.add_argument("--show-tests", dest="showTests",
279279
help="Show all discovered tests",
280280
action="store_true", default=False)
281-
debug_group.add_argument("--use-processes", dest="useProcesses",
281+
debug_group.add_argument("--use-process-pool", dest="executionStrategy",
282+
help="Run tests in parallel with a process pool",
283+
action="store_const", const="PROCESS_POOL")
284+
debug_group.add_argument("--use-processes", dest="executionStrategy",
282285
help="Run tests in parallel with processes (not threads)",
283-
action="store_true", default=True)
284-
debug_group.add_argument("--use-threads", dest="useProcesses",
286+
action="store_const", const="PROCESSES")
287+
debug_group.add_argument("--use-threads", dest="executionStrategy",
285288
help="Run tests in parallel with threads (not processes)",
286-
action="store_false", default=True)
289+
action="store_const", const="THREADS")
287290

288291
opts = parser.parse_args()
289292
args = opts.test_paths
@@ -298,6 +301,9 @@ def main_with_tmp(builtinParameters):
298301
if opts.numThreads is None:
299302
opts.numThreads = lit.util.detectCPUs()
300303

304+
if opts.executionStrategy is None:
305+
opts.executionStrategy = 'PROCESS_POOL'
306+
301307
if opts.maxFailures == 0:
302308
parser.error("Setting --max-failures to 0 does not have any effect.")
303309

@@ -481,7 +487,7 @@ def main_with_tmp(builtinParameters):
481487
display = TestingProgressDisplay(opts, len(run.tests), progressBar)
482488
try:
483489
run.execute_tests(display, opts.numThreads, opts.maxTime,
484-
opts.useProcesses)
490+
opts.executionStrategy)
485491
except KeyboardInterrupt:
486492
sys.exit(2)
487493
display.finish()

‎llvm/utils/lit/lit/run.py

Lines changed: 162 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import sys
23
import threading
34
import time
45
import traceback
@@ -84,11 +85,13 @@ def run(self):
8485
def run_test(self, test_index):
8586
test = self.run_instance.tests[test_index]
8687
try:
87-
self.run_instance.execute_test(test)
88+
execute_test(test, self.run_instance.lit_config,
89+
self.run_instance.parallelism_semaphores)
8890
except KeyboardInterrupt:
8991
# This is a sad hack. Unfortunately subprocess goes
9092
# bonkers with ctrl-c and we start forking merrily.
9193
print('\nCtrl-C detected, goodbye.')
94+
sys.stdout.flush()
9295
os.kill(0,9)
9396
self.consumer.update(test_index, test)
9497

@@ -167,6 +170,44 @@ def update(self, test):
167170
def handleFailures(provider, consumer, maxFailures):
168171
consumer.display = _Display(consumer.display, provider, maxFailures)
169172

173+
def execute_test(test, lit_config, parallelism_semaphores):
174+
"""Execute one test"""
175+
pg = test.config.parallelism_group
176+
if callable(pg):
177+
pg = pg(test)
178+
179+
result = None
180+
semaphore = None
181+
try:
182+
if pg:
183+
semaphore = parallelism_semaphores[pg]
184+
if semaphore:
185+
semaphore.acquire()
186+
start_time = time.time()
187+
result = test.config.test_format.execute(test, lit_config)
188+
# Support deprecated result from execute() which returned the result
189+
# code and additional output as a tuple.
190+
if isinstance(result, tuple):
191+
code, output = result
192+
result = lit.Test.Result(code, output)
193+
elif not isinstance(result, lit.Test.Result):
194+
raise ValueError("unexpected result from test execution")
195+
result.elapsed = time.time() - start_time
196+
except KeyboardInterrupt:
197+
raise
198+
except:
199+
if lit_config.debug:
200+
raise
201+
output = 'Exception during script execution:\n'
202+
output += traceback.format_exc()
203+
output += '\n'
204+
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
205+
finally:
206+
if semaphore:
207+
semaphore.release()
208+
209+
test.setResult(result)
210+
170211
class Run(object):
171212
"""
172213
This class represents a concrete, configured testing run.
@@ -177,42 +218,10 @@ def __init__(self, lit_config, tests):
177218
self.tests = tests
178219

179220
def execute_test(self, test):
180-
pg = test.config.parallelism_group
181-
if callable(pg): pg = pg(test)
182-
183-
result = None
184-
semaphore = None
185-
try:
186-
if pg: semaphore = self.parallelism_semaphores[pg]
187-
if semaphore: semaphore.acquire()
188-
start_time = time.time()
189-
result = test.config.test_format.execute(test, self.lit_config)
190-
191-
# Support deprecated result from execute() which returned the result
192-
# code and additional output as a tuple.
193-
if isinstance(result, tuple):
194-
code, output = result
195-
result = lit.Test.Result(code, output)
196-
elif not isinstance(result, lit.Test.Result):
197-
raise ValueError("unexpected result from test execution")
198-
199-
result.elapsed = time.time() - start_time
200-
except KeyboardInterrupt:
201-
raise
202-
except:
203-
if self.lit_config.debug:
204-
raise
205-
output = 'Exception during script execution:\n'
206-
output += traceback.format_exc()
207-
output += '\n'
208-
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
209-
finally:
210-
if semaphore: semaphore.release()
211-
212-
test.setResult(result)
221+
return execute_test(test, self.lit_config, self.parallelism_semaphores)
213222

214223
def execute_tests(self, display, jobs, max_time=None,
215-
use_processes=False):
224+
execution_strategy=None):
216225
"""
217226
execute_tests(display, jobs, [max_time])
218227
@@ -234,6 +243,14 @@ def execute_tests(self, display, jobs, max_time=None,
234243
be given an UNRESOLVED result.
235244
"""
236245

246+
if execution_strategy == 'PROCESS_POOL':
247+
self.execute_tests_with_mp_pool(display, jobs, max_time)
248+
return
249+
# FIXME: Standardize on the PROCESS_POOL execution strategy and remove
250+
# the other two strategies.
251+
252+
use_processes = execution_strategy == 'PROCESSES'
253+
237254
# Choose the appropriate parallel execution implementation.
238255
consumer = None
239256
if jobs != 1 and use_processes and multiprocessing:
@@ -263,8 +280,8 @@ def execute_tests(self, display, jobs, max_time=None,
263280
provider = TestProvider(queue_impl, canceled_flag)
264281
handleFailures(provider, consumer, self.lit_config.maxFailures)
265282

266-
# Queue the tests outside the main thread because we can't guarantee
267-
# that we can put() all the tests without blocking:
283+
# Putting tasks into the threading or multiprocessing Queue may block,
284+
# so do it in a separate thread.
268285
# https://docs.python.org/2/library/multiprocessing.html
269286
# e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
270287
# without taking any out.
@@ -317,3 +334,111 @@ def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
317334
# Wait for all the tasks to complete.
318335
for t in tasks:
319336
t.join()
337+
338+
def execute_tests_with_mp_pool(self, display, jobs, max_time=None):
339+
# Don't do anything if we aren't going to run any tests.
340+
if not self.tests or jobs == 0:
341+
return
342+
343+
# Set up semaphores to limit parallelism of certain classes of tests.
344+
# For example, some ASan tests require lots of virtual memory and run
345+
# faster with less parallelism on OS X.
346+
self.parallelism_semaphores = \
347+
{k: multiprocessing.Semaphore(v) for k, v in
348+
self.lit_config.parallelism_groups.items()}
349+
350+
# Save the display object on the runner so that we can update it from
351+
# our task completion callback.
352+
self.display = display
353+
354+
# Start a process pool. Copy over the data shared between all test runs.
355+
pool = multiprocessing.Pool(jobs, worker_initializer,
356+
(self.lit_config,
357+
self.parallelism_semaphores))
358+
359+
# Install a console-control signal handler on Windows.
360+
if win32api is not None:
361+
def console_ctrl_handler(type):
362+
print "Ctr-C received, terminating"
363+
pool.terminate()
364+
pool.join()
365+
os.kill(0,9)
366+
return True
367+
win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
368+
369+
# FIXME: Implement max_time using .wait() timeout argument and a
370+
# deadline.
371+
372+
try:
373+
async_results = [pool.apply_async(worker_run_one_test,
374+
args=(test_index, test),
375+
callback=self.consume_test_result)
376+
for test_index, test in enumerate(self.tests)]
377+
378+
# Wait for all results to come in. The callback that runs in the
379+
# parent process will update the display.
380+
for a in async_results:
381+
a.wait()
382+
if not a.successful():
383+
a.get() # Exceptions raised here come from the worker.
384+
finally:
385+
pool.terminate()
386+
pool.join()
387+
388+
# Mark any tests that weren't run as UNRESOLVED.
389+
for test in self.tests:
390+
if test.result is None:
391+
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
392+
393+
def consume_test_result(self, pool_result):
394+
"""Test completion callback for worker_run_one_test
395+
396+
Updates the test result status in the parent process. Each task in the
397+
pool returns the test index and the result, and we use the index to look
398+
up the original test object. Also updates the progress bar as tasks
399+
complete.
400+
"""
401+
(test_index, test_with_result) = pool_result
402+
# Update the parent process copy of the test. This includes the result,
403+
# XFAILS, REQUIRES, and UNSUPPORTED statuses.
404+
assert self.tests[test_index].file_path == test_with_result.file_path, \
405+
"parent and child disagree on test path"
406+
self.tests[test_index] = test_with_result
407+
self.display.update(test_with_result)
408+
409+
child_lit_config = None
410+
child_parallelism_semaphores = None
411+
412+
def worker_initializer(lit_config, parallelism_semaphores):
413+
"""Copy expensive repeated data into worker processes"""
414+
global child_lit_config
415+
child_lit_config = lit_config
416+
global child_parallelism_semaphores
417+
child_parallelism_semaphores = parallelism_semaphores
418+
419+
def worker_run_one_test(test_index, test):
420+
"""Run one test in a multiprocessing.Pool
421+
422+
Side effects in this function and functions it calls are not visible in the
423+
main lit process.
424+
425+
Arguments and results of this function are pickled, so they should be cheap
426+
to copy. For efficiency, we copy all data needed to execute all tests into
427+
each worker and store it in the child_* global variables. This reduces the
428+
cost of each task.
429+
430+
Returns an index and a Result, which the parent process uses to update
431+
the display.
432+
"""
433+
try:
434+
execute_test(test, child_lit_config, child_parallelism_semaphores)
435+
return (test_index, test)
436+
except KeyboardInterrupt as e:
437+
# This is a sad hack. Unfortunately subprocess goes
438+
# bonkers with ctrl-c and we start forking merrily.
439+
print('\nCtrl-C detected, goodbye.')
440+
traceback.print_exc()
441+
sys.stdout.flush()
442+
os.kill(0,9)
443+
except:
444+
traceback.print_exc()

0 commit comments

Comments
 (0)