summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--llvm/utils/lit/lit/run.py119
-rw-r--r--llvm/utils/lit/lit/util.py14
-rw-r--r--llvm/utils/lit/lit/worker.py82
3 files changed, 109 insertions, 106 deletions
diff --git a/llvm/utils/lit/lit/run.py b/llvm/utils/lit/lit/run.py
index a6de83eb3c2..5483f7ffecb 100644
--- a/llvm/utils/lit/lit/run.py
+++ b/llvm/utils/lit/lit/run.py
@@ -1,28 +1,9 @@
-import os
-import sys
-import threading
+import multiprocessing
import time
-import traceback
-try:
- import Queue as queue
-except ImportError:
- import queue
-
-try:
- import win32api
-except ImportError:
- win32api = None
-import multiprocessing
import lit.Test
-
-def abort_now():
- """Abort the current process without doing any exception teardown"""
- sys.stdout.flush()
- if win32api:
- win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
- else:
- os.kill(0, 9)
+import lit.util
+import lit.worker
class _Display(object):
def __init__(self, display, provider, maxFailures):
@@ -48,12 +29,11 @@ class Run(object):
# For example, some ASan tests require lots of virtual memory and run
# faster with less parallelism on OS X.
self.parallelism_semaphores = \
- {k: multiprocessing.Semaphore(v) for k, v in
+ {k: multiprocessing.BoundedSemaphore(v) for k, v in
self.lit_config.parallelism_groups.items()}
def execute_test(self, test):
- return _execute_test_impl(test, self.lit_config,
- self.parallelism_semaphores)
+ return lit.worker._execute_test(test, self.lit_config)
def execute_tests_in_pool(self, jobs, max_time):
# We need to issue many wait calls, so compute the final deadline and
@@ -67,22 +47,22 @@ class Run(object):
# interrupts the workers before we make it into our task callback, they
# will each raise a KeyboardInterrupt exception and print to stderr at
# the same time.
- pool = multiprocessing.Pool(jobs, worker_initializer,
+ pool = multiprocessing.Pool(jobs, lit.worker.initializer,
(self.lit_config,
self.parallelism_semaphores))
# Install a console-control signal handler on Windows.
- if win32api is not None:
+ if lit.util.win32api is not None:
def console_ctrl_handler(type):
print('\nCtrl-C detected, terminating.')
pool.terminate()
pool.join()
- abort_now()
+ lit.util.abort_now()
return True
- win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+ lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
try:
- async_results = [pool.apply_async(worker_run_one_test,
+ async_results = [pool.apply_async(lit.worker.run_one_test,
args=(test_index, test),
callback=self.consume_test_result)
for test_index, test in enumerate(self.tests)]
@@ -143,11 +123,9 @@ class Run(object):
self.failure_count = 0
self.hit_max_failures = False
if jobs == 1:
- global child_lit_config
- child_lit_config = self.lit_config
for test_index, test in enumerate(self.tests):
- result = worker_run_one_test(test_index, test)
- self.consume_test_result(result)
+ lit.worker._execute_test(test, self.lit_config)
+ self.consume_test_result((test_index, test))
if self.hit_max_failures:
break
else:
@@ -159,7 +137,7 @@ class Run(object):
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
def consume_test_result(self, pool_result):
- """Test completion callback for worker_run_one_test
+ """Test completion callback for lit.worker.run_one_test
Updates the test result status in the parent process. Each task in the
pool returns the test index and the result, and we use the index to look
@@ -186,74 +164,3 @@ class Run(object):
if self.lit_config.maxFailures and \
self.failure_count == self.lit_config.maxFailures:
self.hit_max_failures = True
-
-def _execute_test_impl(test, lit_config, parallelism_semaphores):
- """Execute one test"""
- pg = test.config.parallelism_group
- if callable(pg):
- pg = pg(test)
-
- result = None
- semaphore = None
- try:
- if pg:
- semaphore = parallelism_semaphores[pg]
- if semaphore:
- semaphore.acquire()
- start_time = time.time()
- result = test.config.test_format.execute(test, lit_config)
- # Support deprecated result from execute() which returned the result
- # code and additional output as a tuple.
- if isinstance(result, tuple):
- code, output = result
- result = lit.Test.Result(code, output)
- elif not isinstance(result, lit.Test.Result):
- raise ValueError("unexpected result from test execution")
- result.elapsed = time.time() - start_time
- except KeyboardInterrupt:
- raise
- except:
- if lit_config.debug:
- raise
- output = 'Exception during script execution:\n'
- output += traceback.format_exc()
- output += '\n'
- result = lit.Test.Result(lit.Test.UNRESOLVED, output)
- finally:
- if semaphore:
- semaphore.release()
-
- test.setResult(result)
-
-child_lit_config = None
-child_parallelism_semaphores = None
-
-def worker_initializer(lit_config, parallelism_semaphores):
- """Copy expensive repeated data into worker processes"""
- global child_lit_config
- child_lit_config = lit_config
- global child_parallelism_semaphores
- child_parallelism_semaphores = parallelism_semaphores
-
-def worker_run_one_test(test_index, test):
- """Run one test in a multiprocessing.Pool
-
- Side effects in this function and functions it calls are not visible in the
- main lit process.
-
- Arguments and results of this function are pickled, so they should be cheap
- to copy. For efficiency, we copy all data needed to execute all tests into
- each worker and store it in the child_* global variables. This reduces the
- cost of each task.
-
- Returns an index and a Result, which the parent process uses to update
- the display.
- """
- try:
- _execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
- return (test_index, test)
- except KeyboardInterrupt as e:
- # If a worker process gets an interrupt, abort it immediately.
- abort_now()
- except:
- traceback.print_exc()
diff --git a/llvm/utils/lit/lit/util.py b/llvm/utils/lit/lit/util.py
index dee70b3c542..58b5563bae5 100644
--- a/llvm/utils/lit/lit/util.py
+++ b/llvm/utils/lit/lit/util.py
@@ -424,3 +424,17 @@ def killProcessAndChildren(pid):
psutilProc.kill()
except psutil.NoSuchProcess:
pass
+
+
+try:
+ import win32api
+except ImportError:
+ win32api = None
+
+def abort_now():
+ """Abort the current process without doing any exception teardown"""
+ sys.stdout.flush()
+ if win32api:
+ win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
+ else:
+ os.kill(0, 9)
diff --git a/llvm/utils/lit/lit/worker.py b/llvm/utils/lit/lit/worker.py
new file mode 100644
index 00000000000..f97b6a04306
--- /dev/null
+++ b/llvm/utils/lit/lit/worker.py
@@ -0,0 +1,82 @@
+# The functions in this module are meant to run on a separate worker process.
+# Exception: in single process mode _execute_test is called directly.
+import time
+import traceback
+
+import lit.Test
+import lit.util
+
+_lit_config = None
+_parallelism_semaphores = None
+
+def initializer(lit_config, parallelism_semaphores):
+ """Copy expensive repeated data into worker processes"""
+ global _lit_config
+ global _parallelism_semaphores
+ _lit_config = lit_config
+ _parallelism_semaphores = parallelism_semaphores
+
+def run_one_test(test_index, test):
+ """Run one test in a multiprocessing.Pool
+
+ Side effects in this function and functions it calls are not visible in the
+ main lit process.
+
+ Arguments and results of this function are pickled, so they should be cheap
+ to copy. For efficiency, we copy all data needed to execute all tests into
+ each worker and store it in the worker_* global variables. This reduces the
+ cost of each task.
+
+ Returns an index and a Result, which the parent process uses to update
+ the display.
+ """
+ try:
+ _execute_test_in_parallelism_group(test, _lit_config,
+ _parallelism_semaphores)
+ return (test_index, test)
+ except KeyboardInterrupt:
+ # If a worker process gets an interrupt, abort it immediately.
+ lit.util.abort_now()
+ except:
+ traceback.print_exc()
+
+def _execute_test_in_parallelism_group(test, lit_config, parallelism_semaphores):
+ """Execute one test inside the appropriate parallelism group"""
+ pg = test.config.parallelism_group
+ if callable(pg):
+ pg = pg(test)
+
+ if pg:
+ semaphore = parallelism_semaphores[pg]
+ try:
+ semaphore.acquire()
+ _execute_test(test, lit_config)
+ finally:
+ semaphore.release()
+ else:
+ _execute_test(test, lit_config)
+
+def _execute_test(test, lit_config):
+ """Execute one test"""
+ try:
+ start_time = time.time()
+ result = test.config.test_format.execute(test, lit_config)
+ # Support deprecated result from execute() which returned the result
+ # code and additional output as a tuple.
+ if isinstance(result, tuple):
+ code, output = result
+ result = lit.Test.Result(code, output)
+ elif not isinstance(result, lit.Test.Result):
+ raise ValueError("unexpected result from test execution")
+ result.elapsed = time.time() - start_time
+ except KeyboardInterrupt:
+ raise
+ except:
+ if lit_config.debug:
+ raise
+ output = 'Exception during script execution:\n'
+ output += traceback.format_exc()
+ output += '\n'
+ result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+
+ test.setResult(result)
OpenPOWER on IntegriCloud