summaryrefslogtreecommitdiffstats
path: root/llvm/utils
diff options
context:
space:
mode:
authorReid Kleckner <rnk@google.com>2017-04-05 16:44:56 +0000
committerReid Kleckner <rnk@google.com>2017-04-05 16:44:56 +0000
commit1b3c146acbc9a23bf96f40e533ae27fb910ae675 (patch)
tree8dbfc93b184c62c572f66d1e1c899de10e31527f /llvm/utils
parent42e0bb5e4f3ff0ce577ac027be8407dba7229887 (diff)
downloadbcm5719-llvm-1b3c146acbc9a23bf96f40e533ae27fb910ae675.tar.gz
bcm5719-llvm-1b3c146acbc9a23bf96f40e533ae27fb910ae675.zip
[lit] Use process pools for test execution by default
Summary: This drastically reduces lit test execution startup time on Windows. Our previous strategy was to manually create one Process per job and manage the worker pool ourselves. Instead, let's use the worker pool provided by multiprocessing. multiprocessing.Pool(jobs) returns almost immediately, and initializes the appropriate number of workers, so they can all start executing tests immediately. This avoids the ramp-up period that the old implementation suffers from. This appears to speed up small test runs. Here are some timings of the llvm-readobj tests on Windows using the various execution strategies: # multiprocessing.Pool: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-process-pool |& grep real: ; done real: 0m1.156s real: 0m1.078s real: 0m1.094s # multiprocessing.Process: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-processes |& grep real: ; done real: 0m6.062s real: 0m5.860s real: 0m5.984s # threading.Thread: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-threads |& grep real: ; done real: 0m9.438s real: 0m10.765s real: 0m11.079s I kept the old code to launch processes in case this change doesn't work on all platforms that LLVM supports, but at some point I would like to remove both the threading and old multiprocessing execution strategies. Reviewers: modocache, rafael Subscribers: llvm-commits Differential Revision: https://reviews.llvm.org/D31677 llvm-svn: 299560
Diffstat (limited to 'llvm/utils')
-rwxr-xr-xllvm/utils/lit/lit/main.py16
-rw-r--r--llvm/utils/lit/lit/run.py199
2 files changed, 173 insertions, 42 deletions
diff --git a/llvm/utils/lit/lit/main.py b/llvm/utils/lit/lit/main.py
index 95032c6931a..689a2d55bce 100755
--- a/llvm/utils/lit/lit/main.py
+++ b/llvm/utils/lit/lit/main.py
@@ -278,12 +278,15 @@ def main_with_tmp(builtinParameters):
debug_group.add_argument("--show-tests", dest="showTests",
help="Show all discovered tests",
action="store_true", default=False)
- debug_group.add_argument("--use-processes", dest="useProcesses",
+ debug_group.add_argument("--use-process-pool", dest="executionStrategy",
+ help="Run tests in parallel with a process pool",
+ action="store_const", const="PROCESS_POOL")
+ debug_group.add_argument("--use-processes", dest="executionStrategy",
help="Run tests in parallel with processes (not threads)",
- action="store_true", default=True)
- debug_group.add_argument("--use-threads", dest="useProcesses",
+ action="store_const", const="PROCESSES")
+ debug_group.add_argument("--use-threads", dest="executionStrategy",
help="Run tests in parallel with threads (not processes)",
- action="store_false", default=True)
+ action="store_const", const="THREADS")
opts = parser.parse_args()
args = opts.test_paths
@@ -298,6 +301,9 @@ def main_with_tmp(builtinParameters):
if opts.numThreads is None:
opts.numThreads = lit.util.detectCPUs()
+ if opts.executionStrategy is None:
+ opts.executionStrategy = 'PROCESS_POOL'
+
if opts.maxFailures == 0:
parser.error("Setting --max-failures to 0 does not have any effect.")
@@ -481,7 +487,7 @@ def main_with_tmp(builtinParameters):
display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try:
run.execute_tests(display, opts.numThreads, opts.maxTime,
- opts.useProcesses)
+ opts.executionStrategy)
except KeyboardInterrupt:
sys.exit(2)
display.finish()
diff --git a/llvm/utils/lit/lit/run.py b/llvm/utils/lit/lit/run.py
index 2be8a1133b9..10f21c7b2e8 100644
--- a/llvm/utils/lit/lit/run.py
+++ b/llvm/utils/lit/lit/run.py
@@ -1,4 +1,5 @@
import os
+import sys
import threading
import time
import traceback
@@ -84,11 +85,13 @@ class Tester(object):
def run_test(self, test_index):
test = self.run_instance.tests[test_index]
try:
- self.run_instance.execute_test(test)
+ execute_test(test, self.run_instance.lit_config,
+ self.run_instance.parallelism_semaphores)
except KeyboardInterrupt:
# This is a sad hack. Unfortunately subprocess goes
# bonkers with ctrl-c and we start forking merrily.
print('\nCtrl-C detected, goodbye.')
+ sys.stdout.flush()
os.kill(0,9)
self.consumer.update(test_index, test)
@@ -167,6 +170,44 @@ class _Display(object):
def handleFailures(provider, consumer, maxFailures):
consumer.display = _Display(consumer.display, provider, maxFailures)
+def execute_test(test, lit_config, parallelism_semaphores):
+ """Execute one test"""
+ pg = test.config.parallelism_group
+ if callable(pg):
+ pg = pg(test)
+
+ result = None
+ semaphore = None
+ try:
+ if pg:
+ semaphore = parallelism_semaphores[pg]
+ if semaphore:
+ semaphore.acquire()
+ start_time = time.time()
+ result = test.config.test_format.execute(test, lit_config)
+ # Support deprecated result from execute() which returned the result
+ # code and additional output as a tuple.
+ if isinstance(result, tuple):
+ code, output = result
+ result = lit.Test.Result(code, output)
+ elif not isinstance(result, lit.Test.Result):
+ raise ValueError("unexpected result from test execution")
+ result.elapsed = time.time() - start_time
+ except KeyboardInterrupt:
+ raise
+ except:
+ if lit_config.debug:
+ raise
+ output = 'Exception during script execution:\n'
+ output += traceback.format_exc()
+ output += '\n'
+ result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+ finally:
+ if semaphore:
+ semaphore.release()
+
+ test.setResult(result)
+
class Run(object):
"""
This class represents a concrete, configured testing run.
@@ -177,42 +218,10 @@ class Run(object):
self.tests = tests
def execute_test(self, test):
- pg = test.config.parallelism_group
- if callable(pg): pg = pg(test)
-
- result = None
- semaphore = None
- try:
- if pg: semaphore = self.parallelism_semaphores[pg]
- if semaphore: semaphore.acquire()
- start_time = time.time()
- result = test.config.test_format.execute(test, self.lit_config)
-
- # Support deprecated result from execute() which returned the result
- # code and additional output as a tuple.
- if isinstance(result, tuple):
- code, output = result
- result = lit.Test.Result(code, output)
- elif not isinstance(result, lit.Test.Result):
- raise ValueError("unexpected result from test execution")
-
- result.elapsed = time.time() - start_time
- except KeyboardInterrupt:
- raise
- except:
- if self.lit_config.debug:
- raise
- output = 'Exception during script execution:\n'
- output += traceback.format_exc()
- output += '\n'
- result = lit.Test.Result(lit.Test.UNRESOLVED, output)
- finally:
- if semaphore: semaphore.release()
-
- test.setResult(result)
+ return execute_test(test, self.lit_config, self.parallelism_semaphores)
def execute_tests(self, display, jobs, max_time=None,
- use_processes=False):
+ execution_strategy=None):
"""
execute_tests(display, jobs, [max_time])
@@ -234,6 +243,14 @@ class Run(object):
be given an UNRESOLVED result.
"""
+ if execution_strategy == 'PROCESS_POOL':
+ self.execute_tests_with_mp_pool(display, jobs, max_time)
+ return
+ # FIXME: Standardize on the PROCESS_POOL execution strategy and remove
+ # the other two strategies.
+
+ use_processes = execution_strategy == 'PROCESSES'
+
# Choose the appropriate parallel execution implementation.
consumer = None
if jobs != 1 and use_processes and multiprocessing:
@@ -263,8 +280,8 @@ class Run(object):
provider = TestProvider(queue_impl, canceled_flag)
handleFailures(provider, consumer, self.lit_config.maxFailures)
- # Queue the tests outside the main thread because we can't guarantee
- # that we can put() all the tests without blocking:
+ # Putting tasks into the threading or multiprocessing Queue may block,
+ # so do it in a separate thread.
# https://docs.python.org/2/library/multiprocessing.html
# e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
# without taking any out.
@@ -317,3 +334,111 @@ class Run(object):
# Wait for all the tasks to complete.
for t in tasks:
t.join()
+
+ def execute_tests_with_mp_pool(self, display, jobs, max_time=None):
+ # Don't do anything if we aren't going to run any tests.
+ if not self.tests or jobs == 0:
+ return
+
+ # Set up semaphores to limit parallelism of certain classes of tests.
+ # For example, some ASan tests require lots of virtual memory and run
+ # faster with less parallelism on OS X.
+ self.parallelism_semaphores = \
+ {k: multiprocessing.Semaphore(v) for k, v in
+ self.lit_config.parallelism_groups.items()}
+
+ # Save the display object on the runner so that we can update it from
+ # our task completion callback.
+ self.display = display
+
+ # Start a process pool. Copy over the data shared between all test runs.
+ pool = multiprocessing.Pool(jobs, worker_initializer,
+ (self.lit_config,
+ self.parallelism_semaphores))
+
+ # Install a console-control signal handler on Windows.
+ if win32api is not None:
+ def console_ctrl_handler(type):
+ print "Ctr-C received, terminating"
+ pool.terminate()
+ pool.join()
+ os.kill(0,9)
+ return True
+ win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+
+ # FIXME: Implement max_time using .wait() timeout argument and a
+ # deadline.
+
+ try:
+ async_results = [pool.apply_async(worker_run_one_test,
+ args=(test_index, test),
+ callback=self.consume_test_result)
+ for test_index, test in enumerate(self.tests)]
+
+ # Wait for all results to come in. The callback that runs in the
+ # parent process will update the display.
+ for a in async_results:
+ a.wait()
+ if not a.successful():
+ a.get() # Exceptions raised here come from the worker.
+ finally:
+ pool.terminate()
+ pool.join()
+
+ # Mark any tests that weren't run as UNRESOLVED.
+ for test in self.tests:
+ if test.result is None:
+ test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
+
+ def consume_test_result(self, pool_result):
+ """Test completion callback for worker_run_one_test
+
+ Updates the test result status in the parent process. Each task in the
+ pool returns the test index and the result, and we use the index to look
+ up the original test object. Also updates the progress bar as tasks
+ complete.
+ """
+ (test_index, test_with_result) = pool_result
+ # Update the parent process copy of the test. This includes the result,
+ # XFAILS, REQUIRES, and UNSUPPORTED statuses.
+ assert self.tests[test_index].file_path == test_with_result.file_path, \
+ "parent and child disagree on test path"
+ self.tests[test_index] = test_with_result
+ self.display.update(test_with_result)
+
+child_lit_config = None
+child_parallelism_semaphores = None
+
+def worker_initializer(lit_config, parallelism_semaphores):
+ """Copy expensive repeated data into worker processes"""
+ global child_lit_config
+ child_lit_config = lit_config
+ global child_parallelism_semaphores
+ child_parallelism_semaphores = parallelism_semaphores
+
+def worker_run_one_test(test_index, test):
+ """Run one test in a multiprocessing.Pool
+
+ Side effects in this function and functions it calls are not visible in the
+ main lit process.
+
+ Arguments and results of this function are pickled, so they should be cheap
+ to copy. For efficiency, we copy all data needed to execute all tests into
+ each worker and store it in the child_* global variables. This reduces the
+ cost of each task.
+
+ Returns an index and a Result, which the parent process uses to update
+ the display.
+ """
+ try:
+ execute_test(test, child_lit_config, child_parallelism_semaphores)
+ return (test_index, test)
+ except KeyboardInterrupt as e:
+ # This is a sad hack. Unfortunately subprocess goes
+ # bonkers with ctrl-c and we start forking merrily.
+ print('\nCtrl-C detected, goodbye.')
+ traceback.print_exc()
+ sys.stdout.flush()
+ os.kill(0,9)
+ except:
+ traceback.print_exc()
OpenPOWER on IntegriCloud