diff options
| author | Daniel Dunbar <daniel@zuster.org> | 2013-08-29 00:54:23 +0000 | 
|---|---|---|
| committer | Daniel Dunbar <daniel@zuster.org> | 2013-08-29 00:54:23 +0000 | 
| commit | 9a83a76b1b83e74143599717a930f27360355a7f (patch) | |
| tree | 1a469513f43979cc8bae49293f0076fe617a8a2d | |
| parent | b3954fa8898616220a7c3d5ec88663a4395c5ce1 (diff) | |
| download | bcm5719-llvm-9a83a76b1b83e74143599717a930f27360355a7f.tar.gz bcm5719-llvm-9a83a76b1b83e74143599717a930f27360355a7f.zip  | |
[lit] Add support for multiprocessing, under --use-processes for now.
llvm-svn: 189556
| -rw-r--r-- | llvm/utils/lit/TODO | 6 | ||||
| -rwxr-xr-x | llvm/utils/lit/lit/main.py | 9 | ||||
| -rw-r--r-- | llvm/utils/lit/lit/run.py | 139 | 
3 files changed, 115 insertions, 39 deletions
diff --git a/llvm/utils/lit/TODO b/llvm/utils/lit/TODO index e6aeb3d9339..c1a60c6f4f0 100644 --- a/llvm/utils/lit/TODO +++ b/llvm/utils/lit/TODO @@ -113,8 +113,8 @@ Infrastructure    module. This is currently blocked on:    * The external execution mode is faster in some situations, because it avoids -    being bottlenecked on the GIL. We could fix this by moving to a good -    multiprocessing model. +    being bottlenecked on the GIL. This can hopefully be obviated simply by +    using --use-processes.    * Some tests in LLVM/Clang are explicitly disabled with the internal shell      (because they use features specific to bash). We would need to rewrite these @@ -158,8 +158,6 @@ Miscellaneous  * Add --show-unsupported, don't show by default? -* Optionally use multiprocessing. -  * Support valgrind in all configs, and LLVM style valgrind.  * Support a timeout / ulimit. diff --git a/llvm/utils/lit/lit/main.py b/llvm/utils/lit/lit/main.py index 5c40f1ca533..50c9a66c8d3 100755 --- a/llvm/utils/lit/lit/main.py +++ b/llvm/utils/lit/lit/main.py @@ -142,6 +142,12 @@ def main(builtinParameters = {}):      group.add_option("", "--show-tests", dest="showTests",                        help="Show all discovered tests",                        action="store_true", default=False) +    group.add_option("", "--use-processes", dest="useProcesses", +                      help="Run tests in parallel with processes (not threads)", +                      action="store_true", default=False) +    group.add_option("", "--use-threads", dest="useProcesses", +                      help="Run tests in parallel with threads (not processes)", +                      action="store_false", default=False)      parser.add_option_group(group)      (opts, args) = parser.parse_args() @@ -264,7 +270,8 @@ def main(builtinParameters = {}):      startTime = time.time()      display = TestingProgressDisplay(opts, len(run.tests), progressBar)      try: -        run.execute_tests(display, opts.numThreads, opts.maxTime) +        run.execute_tests(display, opts.numThreads, opts.maxTime, +                          opts.useProcesses)      except KeyboardInterrupt:          sys.exit(2)      display.finish() diff --git a/llvm/utils/lit/lit/run.py b/llvm/utils/lit/lit/run.py index 617c3b988f0..8642ff18927 100644 --- a/llvm/utils/lit/lit/run.py +++ b/llvm/utils/lit/lit/run.py @@ -2,42 +2,68 @@ import os  import threading  import time  import traceback +try: +    import Queue as queue +except ImportError: +    import queue  try:      import win32api  except ImportError:      win32api = None +try: +    import multiprocessing +except ImportError: +    multiprocessing = None +  import lit.Test  ###  # Test Execution Implementation -class TestProvider(object): -    def __init__(self, tests): -        self.iter = iter(range(len(tests))) +class LockedValue(object): +    def __init__(self, value):          self.lock = threading.Lock() -        self.canceled = False +        self._value = value -    def cancel(self): +    def _get_value(self):          self.lock.acquire() -        self.canceled = True -        self.lock.release() +        try: +            return self._value +        finally: +            self.lock.release() -    def get(self): -        # Check if we are cancelled. +    def _set_value(self, value):          self.lock.acquire() -        if self.canceled: -          self.lock.release() +        try: +            self._value = value +        finally: +            self.lock.release() + +    value = property(_get_value, _set_value) + +class TestProvider(object): +    def __init__(self, tests, num_jobs, queue_impl, canceled_flag): +        self.canceled_flag = canceled_flag + +        # Create a shared queue to provide the test indices. +        self.queue = queue_impl() +        for i in range(len(tests)): +            self.queue.put(i) +        for i in range(num_jobs): +            self.queue.put(None) + +    def cancel(self): +        self.canceled_flag.value = 1 + +    def get(self): +        # Check if we are canceled. +        if self.canceled_flag.value:            return None          # Otherwise take the next test. -        for item in self.iter: -            break -        else: -            item = None -        self.lock.release() -        return item +        return self.queue.get()  class Tester(object):      def __init__(self, run_instance, provider, consumer): @@ -46,7 +72,7 @@ class Tester(object):          self.consumer = consumer      def run(self): -        while 1: +        while True:              item = self.provider.get()              if item is None:                  break @@ -82,6 +108,42 @@ class ThreadResultsConsumer(object):      def handle_results(self):          pass +class MultiprocessResultsConsumer(object): +    def __init__(self, run, display, num_jobs): +        self.run = run +        self.display = display +        self.num_jobs = num_jobs +        self.queue = multiprocessing.Queue() + +    def update(self, test_index, test): +        # This method is called in the child processes, and communicates the +        # results to the actual display implementation via an output queue. +        self.queue.put((test_index, test.result)) + +    def task_finished(self): +        # This method is called in the child processes, and communicates that +        # individual tasks are complete. +        self.queue.put(None) + +    def handle_results(self): +        # This method is called in the parent, and consumes the results from the +        # output queue and dispatches to the actual display. The method will +        # complete after each of num_jobs tasks has signalled completion. +        completed = 0 +        while completed != self.num_jobs: +            # Wait for a result item. +            item = self.queue.get() +            if item is None: +                completed += 1 +                continue + +            # Update the test result in the parent process. +            index,result = item +            test = self.run.tests[index] +            test.result = result + +            self.display.update(test) +  def run_one_tester(run, provider, display):      tester = Tester(run, provider, display)      tester.run() @@ -123,7 +185,8 @@ class Run(object):          test.setResult(result) -    def execute_tests(self, display, jobs, max_time=None): +    def execute_tests(self, display, jobs, max_time=None, +                      use_processes=False):          """          execute_tests(display, jobs, [max_time]) @@ -145,8 +208,20 @@ class Run(object):          be given an UNRESOLVED result.          """ -        # Create the test provider object. -        provider = TestProvider(self.tests) +        # Choose the appropriate parallel execution implementation. +        if jobs == 1 or not use_processes or multiprocessing is None: +            task_impl = threading.Thread +            queue_impl = queue.Queue +            canceled_flag = LockedValue(0) +            consumer = ThreadResultsConsumer(display) +        else: +            task_impl = multiprocessing.Process +            queue_impl = multiprocessing.Queue +            canceled_flag =  multiprocessing.Value('i', 0) +            consumer = MultiprocessResultsConsumer(self, display, jobs) + +        # Create the test provider. +        provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)          # Install a console-control signal handler on Windows.          if win32api is not None: @@ -162,8 +237,12 @@ class Run(object):              timeout_timer = threading.Timer(max_time, timeout_handler)              timeout_timer.start() -        # Actually execute the tests. -        self._execute_tests_with_provider(provider, display, jobs) +        # If not using multiple tasks, just run the tests directly. +        if jobs == 1: +            run_one_tester(self, provider, consumer) +        else: +            # Otherwise, execute the tests in parallel +            self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)          # Cancel the timeout handler.          if max_time is not None: @@ -174,18 +253,10 @@ class Run(object):              if test.result is None:                  test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) -    def _execute_tests_with_provider(self, provider, display, jobs): -        consumer = ThreadResultsConsumer(display) - -        # If only using one testing thread, don't use tasks at all; this lets us -        # profile, among other things. -        if jobs == 1: -            run_one_tester(self, provider, consumer) -            return - +    def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):          # Start all of the tasks. -        tasks = [threading.Thread(target=run_one_tester, -                                  args=(self, provider, consumer)) +        tasks = [task_impl(target=run_one_tester, +                           args=(self, provider, consumer))                   for i in range(jobs)]          for t in tasks:              t.start()  | 

