diff options
Diffstat (limited to 'import-layers/yocto-poky/meta/lib/oeqa/core')
41 files changed, 0 insertions, 2692 deletions
diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/README b/import-layers/yocto-poky/meta/lib/oeqa/core/README deleted file mode 100644 index d4fcda41f..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/README +++ /dev/null @@ -1,76 +0,0 @@ -= OEQA (v2) Framework = - -== Introduction == - -This is version 2 of the OEQA framework. Base clases are located in the -'oeqa/core' directory and subsequent components must extend from these. - -The main design consideration was to implement the needed functionality on -top of the Python unittest framework. To achieve this goal, the following -modules are used: - - * oeqa/core/runner.py: Provides OETestResult and OETestRunner base - classes extending the unittest class. These classes support exporting - results to different formats; currently RAW and XML support exist. - - * oeqa/core/loader.py: Provides OETestLoader extending the unittest class. - It also features a unified implementation of decorator support and - filtering test cases. - - * oeqa/core/case.py: Provides OETestCase base class extending - unittest.TestCase and provides access to the Test data (td), Test context - and Logger functionality. - - * oeqa/core/decorator: Provides OETestDecorator, a new class to implement - decorators for Test cases. - - * oeqa/core/context: Provides OETestContext, a high-level API for - loadTests and runTests of certain Test component and - OETestContextExecutor a base class to enable oe-test to discover/use - the Test component. - -Also, a new 'oe-test' runner is located under 'scripts', allowing scans for components -that supports OETestContextExecutor (see below). - -== Terminology == - - * Test component: The area of testing in the Project, for example: runtime, SDK, eSDK, selftest. - - * Test data: Data associated with the Test component. Currently we use bitbake datastore as - a Test data input. - - * Test context: A context of what tests needs to be run and how to do it; this additionally - provides access to the Test data and could have custom methods and/or attrs. - -== oe-test == - -The new tool, oe-test, has the ability to scan the code base for test components and provide -a unified way to run test cases. Internally it scans folders inside oeqa module in order to find -specific classes that implement a test component. - -== Usage == - -Executing the example test component - - $ source oe-init-build-env - $ oe-test core - -Getting help - - $ oe-test -h - -== Creating new Test Component == - -Adding a new test component the developer needs to extend OETestContext/OETestContextExecutor -(from context.py) and OETestCase (from case.py) - -== Selftesting the framework == - -Run all tests: - - $ PATH=$PATH:../../ python3 -m unittest discover -s tests - -Run some test: - - $ cd tests/ - $ ./test_data.py diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/__init__.py +++ /dev/null diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/case.py b/import-layers/yocto-poky/meta/lib/oeqa/core/case.py deleted file mode 100644 index 917a2aa3f..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/case.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import unittest - -from oeqa.core.exception import OEQAMissingVariable - -def _validate_td_vars(td, td_vars, type_msg): - if td_vars: - for v in td_vars: - if not v in td: - raise OEQAMissingVariable("Test %s need %s variable but"\ - " isn't into td" % (type_msg, v)) - -class OETestCase(unittest.TestCase): - # TestContext and Logger instance set by OETestLoader. - tc = None - logger = None - - # td has all the variables needed by the test cases - # is the same across all the test cases. - td = None - - # td_vars has the variables needed by a test class - # or test case instance, if some var isn't into td a - # OEQAMissingVariable exception is raised - td_vars = None - - @classmethod - def _oeSetUpClass(clss): - _validate_td_vars(clss.td, clss.td_vars, "class") - clss.setUpClassMethod() - - @classmethod - def _oeTearDownClass(clss): - clss.tearDownClassMethod() - - def _oeSetUp(self): - for d in self.decorators: - d.setUpDecorator() - self.setUpMethod() - - def _oeTearDown(self): - for d in self.decorators: - d.tearDownDecorator() - self.tearDownMethod() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/__init__.py +++ /dev/null diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/data.json b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/data.json deleted file mode 100644 index 21d6b16d1..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/data.json +++ /dev/null @@ -1 +0,0 @@ -{"ARCH": "x86", "IMAGE": "core-image-minimal"}
\ No newline at end of file diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/test_basic.py b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/test_basic.py deleted file mode 100644 index 11cf3800c..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/test_basic.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase -from oeqa.core.decorator.depends import OETestDepends - -class OETestExample(OETestCase): - def test_example(self): - self.logger.info('IMAGE: %s' % self.td.get('IMAGE')) - self.assertEqual('core-image-minimal', self.td.get('IMAGE')) - self.logger.info('ARCH: %s' % self.td.get('ARCH')) - self.assertEqual('x86', self.td.get('ARCH')) - -class OETestExampleDepend(OETestCase): - @OETestDepends(['OETestExample.test_example']) - def test_example_depends(self): - pass - - def test_example_no_depends(self): - pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/context.py b/import-layers/yocto-poky/meta/lib/oeqa/core/context.py deleted file mode 100644 index acd547416..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/context.py +++ /dev/null @@ -1,191 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import sys -import json -import time -import logging -import collections - -from oeqa.core.loader import OETestLoader -from oeqa.core.runner import OETestRunner -from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound - -class OETestContext(object): - loaderClass = OETestLoader - runnerClass = OETestRunner - - files_dir = os.path.abspath(os.path.join(os.path.dirname( - os.path.abspath(__file__)), "../files")) - - def __init__(self, td=None, logger=None): - if not type(td) is dict: - raise TypeError("td isn't dictionary type") - - self.td = td - self.logger = logger - self._registry = {} - self._registry['cases'] = collections.OrderedDict() - self._results = {} - - def _read_modules_from_manifest(self, manifest): - if not os.path.exists(manifest): - raise OEQAMissingManifest("Manifest does not exist on %s" % manifest) - - modules = [] - for line in open(manifest).readlines(): - line = line.strip() - if line and not line.startswith("#"): - modules.append(line) - - return modules - - def skipTests(self, skips): - if not skips: - return - for test in self.suites: - for skip in skips: - if test.id().startswith(skip): - setattr(test, 'setUp', lambda: test.skipTest('Skip by the command line argument "%s"' % skip)) - - def loadTests(self, module_paths, modules=[], tests=[], - modules_manifest="", modules_required=[], filters={}): - if modules_manifest: - modules = self._read_modules_from_manifest(modules_manifest) - - self.loader = self.loaderClass(self, module_paths, modules, tests, - modules_required, filters) - self.suites = self.loader.discover() - - def runTests(self, skips=[]): - self.runner = self.runnerClass(self, descriptions=False, verbosity=2) - - # Dinamically skip those tests specified though arguments - self.skipTests(skips) - - self._run_start_time = time.time() - result = self.runner.run(self.suites) - self._run_end_time = time.time() - - return result - - def listTests(self, display_type): - self.runner = self.runnerClass(self, verbosity=2) - return self.runner.list_tests(self.suites, display_type) - -class OETestContextExecutor(object): - _context_class = OETestContext - _script_executor = 'oe-test' - - name = 'core' - help = 'core test component example' - description = 'executes core test suite example' - - default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), - 'cases/example')] - default_test_data = os.path.join(default_cases[0], 'data.json') - default_tests = None - - def register_commands(self, logger, subparsers): - self.parser = subparsers.add_parser(self.name, help=self.help, - description=self.description, group='components') - - self.default_output_log = '%s-results-%s.log' % (self.name, - time.strftime("%Y%m%d%H%M%S")) - self.parser.add_argument('--output-log', action='store', - default=self.default_output_log, - help="results output log, default: %s" % self.default_output_log) - - group = self.parser.add_mutually_exclusive_group() - group.add_argument('--run-tests', action='store', nargs='+', - default=self.default_tests, - help="tests to run in <module>[.<class>[.<name>]]") - group.add_argument('--list-tests', action='store', - choices=('module', 'class', 'name'), - help="lists available tests") - - if self.default_test_data: - self.parser.add_argument('--test-data-file', action='store', - default=self.default_test_data, - help="data file to load, default: %s" % self.default_test_data) - else: - self.parser.add_argument('--test-data-file', action='store', - help="data file to load") - - if self.default_cases: - self.parser.add_argument('CASES_PATHS', action='store', - default=self.default_cases, nargs='*', - help="paths to directories with test cases, default: %s"\ - % self.default_cases) - else: - self.parser.add_argument('CASES_PATHS', action='store', - nargs='+', help="paths to directories with test cases") - - self.parser.set_defaults(func=self.run) - - def _setup_logger(self, logger, args): - formatter = logging.Formatter('%(asctime)s - ' + self.name + \ - ' - %(levelname)s - %(message)s') - sh = logger.handlers[0] - sh.setFormatter(formatter) - fh = logging.FileHandler(args.output_log) - fh.setFormatter(formatter) - logger.addHandler(fh) - - return logger - - def _process_args(self, logger, args): - self.tc_kwargs = {} - self.tc_kwargs['init'] = {} - self.tc_kwargs['load'] = {} - self.tc_kwargs['list'] = {} - self.tc_kwargs['run'] = {} - - self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) - if args.test_data_file: - self.tc_kwargs['init']['td'] = json.load( - open(args.test_data_file, "r")) - else: - self.tc_kwargs['init']['td'] = {} - - if args.run_tests: - self.tc_kwargs['load']['modules'] = args.run_tests - self.tc_kwargs['load']['modules_required'] = args.run_tests - else: - self.tc_kwargs['load']['modules'] = [] - - self.tc_kwargs['run']['skips'] = [] - - self.module_paths = args.CASES_PATHS - - def _pre_run(self): - pass - - def run(self, logger, args): - self._process_args(logger, args) - - self.tc = self._context_class(**self.tc_kwargs['init']) - try: - self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) - except OEQATestNotFound as ex: - logger.error(ex) - sys.exit(1) - - if args.list_tests: - rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list']) - else: - self._pre_run() - rc = self.tc.runTests(**self.tc_kwargs['run']) - rc.logDetails() - rc.logSummary(self.name) - - output_link = os.path.join(os.path.dirname(args.output_log), - "%s-results.log" % self.name) - if os.path.exists(output_link): - os.remove(output_link) - os.symlink(args.output_log, output_link) - - return rc - -_executor_class = OETestContextExecutor diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/__init__.py deleted file mode 100644 index 855b6b9d2..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/__init__.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from functools import wraps -from abc import abstractmethod - -decoratorClasses = set() - -def registerDecorator(obj): - decoratorClasses.add(obj) - return obj - -class OETestDecorator(object): - case = None # Reference of OETestCase decorated - attrs = None # Attributes to be loaded by decorator implementation - - def __init__(self, *args, **kwargs): - if not self.attrs: - return - - for idx, attr in enumerate(self.attrs): - if attr in kwargs: - value = kwargs[attr] - else: - value = args[idx] - setattr(self, attr, value) - - def __call__(self, func): - @wraps(func) - def wrapped_f(*args, **kwargs): - self.attrs = self.attrs # XXX: Enables OETestLoader discover - return func(*args, **kwargs) - return wrapped_f - - # OETestLoader call it when is loading test cases. - # XXX: Most methods would change the registry for later - # processing; be aware that filtrate method needs to - # run later than bind, so there could be data (in the - # registry) of a cases that were filtered. - def bind(self, registry, case): - self.case = case - self.logger = case.tc.logger - self.case.decorators.append(self) - - # OETestRunner call this method when tries to run - # the test case. - def setUpDecorator(self): - pass - - # OETestRunner call it after a test method has been - # called even if the method raised an exception. - def tearDownDecorator(self): - pass - -class OETestDiscover(OETestDecorator): - - # OETestLoader call it after discover test cases - # needs to return the cases to be run. - @staticmethod - def discover(registry): - return registry['cases'] - -class OETestFilter(OETestDecorator): - - # OETestLoader call it while loading the tests - # in loadTestsFromTestCase method, it needs to - # return a bool, True if needs to be filtered. - # This method must consume the filter used. - @abstractmethod - def filtrate(self, filters): - return False diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/data.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/data.py deleted file mode 100644 index ff7bdd98b..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/data.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.exception import OEQAMissingVariable - -from . import OETestDecorator, registerDecorator - -def has_feature(td, feature): - """ - Checks for feature in DISTRO_FEATURES or IMAGE_FEATURES. - """ - - if (feature in td.get('DISTRO_FEATURES', '') or - feature in td.get('IMAGE_FEATURES', '')): - return True - return False - -@registerDecorator -class skipIfDataVar(OETestDecorator): - """ - Skip test based on value of a data store's variable. - - It will get the info of var from the data store and will - check it against value; if are equal it will skip the test - with msg as the reason. - """ - - attrs = ('var', 'value', 'msg') - - def setUpDecorator(self): - msg = ('Checking if %r value is %r to skip test' % - (self.var, self.value)) - self.logger.debug(msg) - if self.case.td.get(self.var) == self.value: - self.case.skipTest(self.msg) - -@registerDecorator -class skipIfNotDataVar(OETestDecorator): - """ - Skip test based on value of a data store's variable. - - It will get the info of var from the data store and will - check it against value; if are not equal it will skip the - test with msg as the reason. - """ - - attrs = ('var', 'value', 'msg') - - def setUpDecorator(self): - msg = ('Checking if %r value is not %r to skip test' % - (self.var, self.value)) - self.logger.debug(msg) - if not self.case.td.get(self.var) == self.value: - self.case.skipTest(self.msg) - -@registerDecorator -class skipIfNotInDataVar(OETestDecorator): - """ - Skip test if value is not in data store's variable. - """ - - attrs = ('var', 'value', 'msg') - def setUpDecorator(self): - msg = ('Checking if %r value is in %r to run ' - 'the test' % (self.var, self.value)) - self.logger.debug(msg) - if not self.value in self.case.td.get(self.var): - self.case.skipTest(self.msg) - -@registerDecorator -class OETestDataDepends(OETestDecorator): - attrs = ('td_depends',) - - def setUpDecorator(self): - for v in self.td_depends: - try: - value = self.case.td[v] - except KeyError: - raise OEQAMissingVariable("Test case need %s variable but"\ - " isn't into td" % v) - -@registerDecorator -class skipIfNotFeature(OETestDecorator): - """ - Skip test based on DISTRO_FEATURES. - - value must be in distro features or it will skip the test - with msg as the reason. - """ - - attrs = ('value', 'msg') - - def setUpDecorator(self): - msg = ('Checking if %s is in DISTRO_FEATURES ' - 'or IMAGE_FEATURES' % (self.value)) - self.logger.debug(msg) - if not has_feature(self.case.td, self.value): - self.case.skipTest(self.msg) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/depends.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/depends.py deleted file mode 100644 index baa04341c..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/depends.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from unittest import SkipTest - -from oeqa.core.threaded import OETestRunnerThreaded -from oeqa.core.exception import OEQADependency - -from . import OETestDiscover, registerDecorator - -def _add_depends(registry, case, depends): - module_name = case.__module__ - class_name = case.__class__.__name__ - - case_id = case.id() - - for depend in depends: - dparts = depend.split('.') - - if len(dparts) == 1: - depend_id = ".".join((module_name, class_name, dparts[0])) - elif len(dparts) == 2: - depend_id = ".".join((module_name, dparts[0], dparts[1])) - else: - depend_id = depend - - if not case_id in registry: - registry[case_id] = [] - if not depend_id in registry[case_id]: - registry[case_id].append(depend_id) - -def _validate_test_case_depends(cases, depends): - for case in depends: - if not case in cases: - continue - for dep in depends[case]: - if not dep in cases: - raise OEQADependency("TestCase %s depends on %s and isn't available"\ - ", cases available %s." % (case, dep, str(cases.keys()))) - -def _order_test_case_by_depends(cases, depends): - def _dep_resolve(graph, node, resolved, seen): - seen.append(node) - for edge in graph[node]: - if edge not in resolved: - if edge in seen: - raise OEQADependency("Test cases %s and %s have a circular" \ - " dependency." % (node, edge)) - _dep_resolve(graph, edge, resolved, seen) - resolved.append(node) - - dep_graph = {} - dep_graph['__root__'] = cases.keys() - for case in cases: - if case in depends: - dep_graph[case] = depends[case] - else: - dep_graph[case] = [] - - cases_ordered = [] - _dep_resolve(dep_graph, '__root__', cases_ordered, []) - cases_ordered.remove('__root__') - - return [cases[case_id] for case_id in cases_ordered] - -def _skipTestDependency(case, depends): - if isinstance(case.tc.runner, OETestRunnerThreaded): - import threading - results = case.tc._results[threading.get_ident()] - else: - results = case.tc._results - - skipReasons = ['errors', 'failures', 'skipped'] - - for reason in skipReasons: - for test, _ in results[reason]: - if test.id() in depends: - raise SkipTest("Test case %s depends on %s and was in %s." \ - % (case.id(), test.id(), reason)) - -@registerDecorator -class OETestDepends(OETestDiscover): - attrs = ('depends',) - - def bind(self, registry, case): - super(OETestDepends, self).bind(registry, case) - if not registry.get('depends'): - registry['depends'] = {} - _add_depends(registry['depends'], case, self.depends) - - @staticmethod - def discover(registry): - if registry.get('depends'): - _validate_test_case_depends(registry['cases'], registry['depends']) - return _order_test_case_by_depends(registry['cases'], registry['depends']) - else: - return [registry['cases'][case_id] for case_id in registry['cases']] - - def setUpDecorator(self): - _skipTestDependency(self.case, self.depends) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oeid.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oeid.py deleted file mode 100644 index ea8017a55..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oeid.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from . import OETestFilter, registerDecorator -from oeqa.core.utils.misc import intToList - -def _idFilter(oeid, filters): - return False if oeid in filters else True - -@registerDecorator -class OETestID(OETestFilter): - attrs = ('oeid',) - - def bind(self, registry, case): - super(OETestID, self).bind(registry, case) - - def filtrate(self, filters): - if filters.get('oeid'): - filterx = intToList(filters['oeid'], 'oeid') - del filters['oeid'] - if _idFilter(self.oeid, filterx): - return True - return False diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetag.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetag.py deleted file mode 100644 index ad38ab78a..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetag.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from . import OETestFilter, registerDecorator -from oeqa.core.utils.misc import strToList - -def _tagFilter(tags, filters): - return False if set(tags) & set(filters) else True - -@registerDecorator -class OETestTag(OETestFilter): - attrs = ('oetag',) - - def bind(self, registry, case): - super(OETestTag, self).bind(registry, case) - self.oetag = strToList(self.oetag, 'oetag') - - def filtrate(self, filters): - if filters.get('oetag'): - filterx = strToList(filters['oetag'], 'oetag') - del filters['oetag'] - if _tagFilter(self.oetag, filterx): - return True - return False diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetimeout.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetimeout.py deleted file mode 100644 index f85e7d979..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetimeout.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from . import OETestDecorator, registerDecorator - -import signal -from threading import Timer - -from oeqa.core.threaded import OETestRunnerThreaded -from oeqa.core.exception import OEQATimeoutError - -@registerDecorator -class OETimeout(OETestDecorator): - attrs = ('oetimeout',) - - def setUpDecorator(self): - self.logger.debug("Setting up a %d second(s) timeout" % self.oetimeout) - - if isinstance(self.case.tc.runner, OETestRunnerThreaded): - self.timeouted = False - def _timeoutHandler(): - self.timeouted = True - - self.timer = Timer(self.oetimeout, _timeoutHandler) - self.timer.start() - else: - timeout = self.oetimeout - def _timeoutHandler(signum, frame): - raise OEQATimeoutError("Timed out after %s " - "seconds of execution" % timeout) - - self.alarmSignal = signal.signal(signal.SIGALRM, _timeoutHandler) - signal.alarm(self.oetimeout) - - def tearDownDecorator(self): - if isinstance(self.case.tc.runner, OETestRunnerThreaded): - self.timer.cancel() - self.logger.debug("Removed Timer handler") - if self.timeouted: - raise OEQATimeoutError("Timed out after %s " - "seconds of execution" % self.oetimeout) - else: - signal.alarm(0) - signal.signal(signal.SIGALRM, self.alarmSignal) - self.logger.debug("Removed SIGALRM handler") diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/exception.py b/import-layers/yocto-poky/meta/lib/oeqa/core/exception.py deleted file mode 100644 index 732f2efde..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/exception.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -class OEQAException(Exception): - pass - -class OEQATimeoutError(OEQAException): - pass - -class OEQAMissingVariable(OEQAException): - pass - -class OEQADependency(OEQAException): - pass - -class OEQAMissingManifest(OEQAException): - pass - -class OEQAPreRun(OEQAException): - pass - -class OEQATestNotFound(OEQAException): - pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/loader.py b/import-layers/yocto-poky/meta/lib/oeqa/core/loader.py deleted file mode 100644 index a4744dee0..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/loader.py +++ /dev/null @@ -1,355 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import re -import sys -import unittest -import inspect - -from oeqa.core.utils.path import findFile -from oeqa.core.utils.test import getSuiteModules, getCaseID - -from oeqa.core.exception import OEQATestNotFound -from oeqa.core.case import OETestCase -from oeqa.core.decorator import decoratorClasses, OETestDecorator, \ - OETestFilter, OETestDiscover - -# When loading tests, the unittest framework stores any exceptions and -# displays them only when the run method is called. -# -# For our purposes, it is better to raise the exceptions in the loading -# step rather than waiting to run the test suite. -# -# Generate the function definition because this differ across python versions -# Python >= 3.4.4 uses tree parameters instead four but for example Python 3.5.3 -# ueses four parameters so isn't incremental. -_failed_test_args = inspect.getargspec(unittest.loader._make_failed_test).args -exec("""def _make_failed_test(%s): raise exception""" % ', '.join(_failed_test_args)) -unittest.loader._make_failed_test = _make_failed_test - -def _find_duplicated_modules(suite, directory): - for module in getSuiteModules(suite): - path = findFile('%s.py' % module, directory) - if path: - raise ImportError("Duplicated %s module found in %s" % (module, path)) - -def _built_modules_dict(modules): - modules_dict = {} - - if modules == None: - return modules_dict - - for module in modules: - # Assumption: package and module names do not contain upper case - # characters, whereas class names do - m = re.match(r'^(\w+)(?:\.(\w[^.]*)(?:\.([^.]+))?)?$', module, flags=re.ASCII) - - module_name, class_name, test_name = m.groups() - - if module_name and module_name not in modules_dict: - modules_dict[module_name] = {} - if class_name and class_name not in modules_dict[module_name]: - modules_dict[module_name][class_name] = [] - if test_name and test_name not in modules_dict[module_name][class_name]: - modules_dict[module_name][class_name].append(test_name) - - return modules_dict - -class OETestLoader(unittest.TestLoader): - caseClass = OETestCase - - kwargs_names = ['testMethodPrefix', 'sortTestMethodUsing', 'suiteClass', - '_top_level_dir'] - - def __init__(self, tc, module_paths, modules, tests, modules_required, - filters, *args, **kwargs): - self.tc = tc - - self.modules = _built_modules_dict(modules) - - self.tests = tests - self.modules_required = modules_required - - self.filters = filters - self.decorator_filters = [d for d in decoratorClasses if \ - issubclass(d, OETestFilter)] - self._validateFilters(self.filters, self.decorator_filters) - self.used_filters = [d for d in self.decorator_filters - for f in self.filters - if f in d.attrs] - - if isinstance(module_paths, str): - module_paths = [module_paths] - elif not isinstance(module_paths, list): - raise TypeError('module_paths must be a str or a list of str') - self.module_paths = module_paths - - for kwname in self.kwargs_names: - if kwname in kwargs: - setattr(self, kwname, kwargs[kwname]) - - self._patchCaseClass(self.caseClass) - - super(OETestLoader, self).__init__() - - def _patchCaseClass(self, testCaseClass): - # Adds custom attributes to the OETestCase class - setattr(testCaseClass, 'tc', self.tc) - setattr(testCaseClass, 'td', self.tc.td) - setattr(testCaseClass, 'logger', self.tc.logger) - - def _validateFilters(self, filters, decorator_filters): - # Validate if filter isn't empty - for key,value in filters.items(): - if not value: - raise TypeError("Filter %s specified is empty" % key) - - # Validate unique attributes - attr_filters = [attr for clss in decorator_filters \ - for attr in clss.attrs] - dup_attr = [attr for attr in attr_filters - if attr_filters.count(attr) > 1] - if dup_attr: - raise TypeError('Detected duplicated attribute(s) %s in filter' - ' decorators' % ' ,'.join(dup_attr)) - - # Validate if filter is supported - for f in filters: - if f not in attr_filters: - classes = ', '.join([d.__name__ for d in decorator_filters]) - raise TypeError('Found "%s" filter but not declared in any of ' - '%s decorators' % (f, classes)) - - def _registerTestCase(self, case): - case_id = case.id() - self.tc._registry['cases'][case_id] = case - - def _handleTestCaseDecorators(self, case): - def _handle(obj): - if isinstance(obj, OETestDecorator): - if not obj.__class__ in decoratorClasses: - raise Exception("Decorator %s isn't registered" \ - " in decoratorClasses." % obj.__name__) - obj.bind(self.tc._registry, case) - - def _walk_closure(obj): - if hasattr(obj, '__closure__') and obj.__closure__: - for f in obj.__closure__: - obj = f.cell_contents - _handle(obj) - _walk_closure(obj) - method = getattr(case, case._testMethodName, None) - _walk_closure(method) - - def _filterTest(self, case): - """ - Returns True if test case must be filtered, False otherwise. - """ - # XXX; If the module has more than one namespace only use - # the first to support run the whole module specifying the - # <module_name>.[test_class].[test_name] - module_name_small = case.__module__.split('.')[0] - module_name = case.__module__ - - class_name = case.__class__.__name__ - test_name = case._testMethodName - - if self.modules: - module = None - try: - module = self.modules[module_name_small] - except KeyError: - try: - module = self.modules[module_name] - except KeyError: - return True - - if module: - if not class_name in module: - return True - - if module[class_name]: - if test_name not in module[class_name]: - return True - - # Decorator filters - if self.filters and isinstance(case, OETestCase): - filters = self.filters.copy() - case_decorators = [cd for cd in case.decorators - if cd.__class__ in self.used_filters] - - # Iterate over case decorators to check if needs to be filtered. - for cd in case_decorators: - if cd.filtrate(filters): - return True - - # Case is missing one or more decorators for all the filters - # being used, so filter test case. - if filters: - return True - - return False - - def _getTestCase(self, testCaseClass, tcName): - if not hasattr(testCaseClass, '__oeqa_loader') and \ - issubclass(testCaseClass, OETestCase): - # In order to support data_vars validation - # monkey patch the default setUp/tearDown{Class} to use - # the ones provided by OETestCase - setattr(testCaseClass, 'setUpClassMethod', - getattr(testCaseClass, 'setUpClass')) - setattr(testCaseClass, 'tearDownClassMethod', - getattr(testCaseClass, 'tearDownClass')) - setattr(testCaseClass, 'setUpClass', - testCaseClass._oeSetUpClass) - setattr(testCaseClass, 'tearDownClass', - testCaseClass._oeTearDownClass) - - # In order to support decorators initialization - # monkey patch the default setUp/tearDown to use - # a setUpDecorators/tearDownDecorators that methods - # will call setUp/tearDown original methods. - setattr(testCaseClass, 'setUpMethod', - getattr(testCaseClass, 'setUp')) - setattr(testCaseClass, 'tearDownMethod', - getattr(testCaseClass, 'tearDown')) - setattr(testCaseClass, 'setUp', testCaseClass._oeSetUp) - setattr(testCaseClass, 'tearDown', testCaseClass._oeTearDown) - - setattr(testCaseClass, '__oeqa_loader', True) - - case = testCaseClass(tcName) - if isinstance(case, OETestCase): - setattr(case, 'decorators', []) - - return case - - def loadTestsFromTestCase(self, testCaseClass): - """ - Returns a suite of all tests cases contained in testCaseClass. - """ - if issubclass(testCaseClass, unittest.suite.TestSuite): - raise TypeError("Test cases should not be derived from TestSuite." \ - " Maybe you meant to derive %s from TestCase?" \ - % testCaseClass.__name__) - if not issubclass(testCaseClass, unittest.case.TestCase): - raise TypeError("Test %s is not derived from %s" % \ - (testCaseClass.__name__, unittest.case.TestCase.__name__)) - - testCaseNames = self.getTestCaseNames(testCaseClass) - if not testCaseNames and hasattr(testCaseClass, 'runTest'): - testCaseNames = ['runTest'] - - suite = [] - for tcName in testCaseNames: - case = self._getTestCase(testCaseClass, tcName) - # Filer by case id - if not (self.tests and not 'all' in self.tests - and not getCaseID(case) in self.tests): - self._handleTestCaseDecorators(case) - - # Filter by decorators - if not self._filterTest(case): - self._registerTestCase(case) - suite.append(case) - - return self.suiteClass(suite) - - def _required_modules_validation(self): - """ - Search in Test context registry if a required - test is found, raise an exception when not found. - """ - - for module in self.modules_required: - found = False - - # The module name is splitted to only compare the - # first part of a test case id. - comp_len = len(module.split('.')) - for case in self.tc._registry['cases']: - case_comp = '.'.join(case.split('.')[0:comp_len]) - if module == case_comp: - found = True - break - - if not found: - raise OEQATestNotFound("Not found %s in loaded test cases" % \ - module) - - def discover(self): - big_suite = self.suiteClass() - for path in self.module_paths: - _find_duplicated_modules(big_suite, path) - suite = super(OETestLoader, self).discover(path, - pattern='*.py', top_level_dir=path) - big_suite.addTests(suite) - - cases = None - discover_classes = [clss for clss in decoratorClasses - if issubclass(clss, OETestDiscover)] - for clss in discover_classes: - cases = clss.discover(self.tc._registry) - - if self.modules_required: - self._required_modules_validation() - - return self.suiteClass(cases) if cases else big_suite - - def _filterModule(self, module): - if module.__name__ in sys.builtin_module_names: - msg = 'Tried to import %s test module but is a built-in' - raise ImportError(msg % module.__name__) - - # XXX; If the module has more than one namespace only use - # the first to support run the whole module specifying the - # <module_name>.[test_class].[test_name] - module_name_small = module.__name__.split('.')[0] - module_name = module.__name__ - - # Normal test modules are loaded if no modules were specified, - # if module is in the specified module list or if 'all' is in - # module list. - # Underscore modules are loaded only if specified in module list. - load_module = True if not module_name.startswith('_') \ - and (not self.modules \ - or module_name in self.modules \ - or module_name_small in self.modules \ - or 'all' in self.modules) \ - else False - - load_underscore = True if module_name.startswith('_') \ - and (module_name in self.modules or \ - module_name_small in self.modules) \ - else False - - return (load_module, load_underscore) - - - # XXX After Python 3.5, remove backward compatibility hacks for - # use_load_tests deprecation via *args and **kws. See issue 16662. - if sys.version_info >= (3,5): - def loadTestsFromModule(self, module, *args, pattern=None, **kws): - """ - Returns a suite of all tests cases contained in module. - """ - load_module, load_underscore = self._filterModule(module) - - if load_module or load_underscore: - return super(OETestLoader, self).loadTestsFromModule( - module, *args, pattern=pattern, **kws) - else: - return self.suiteClass() - else: - def loadTestsFromModule(self, module, use_load_tests=True): - """ - Returns a suite of all tests cases contained in module. - """ - load_module, load_underscore = self._filterModule(module) - - if load_module or load_underscore: - return super(OETestLoader, self).loadTestsFromModule( - module, use_load_tests) - else: - return self.suiteClass() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/runner.py b/import-layers/yocto-poky/meta/lib/oeqa/core/runner.py deleted file mode 100644 index 13cdf5ba5..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/runner.py +++ /dev/null @@ -1,277 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import time -import unittest -import logging -import re - -xmlEnabled = False -try: - import xmlrunner - from xmlrunner.result import _XMLTestResult as _TestResult - from xmlrunner.runner import XMLTestRunner as _TestRunner - xmlEnabled = True -except ImportError: - # use the base runner instead - from unittest import TextTestResult as _TestResult - from unittest import TextTestRunner as _TestRunner - -class OEStreamLogger(object): - def __init__(self, logger): - self.logger = logger - self.buffer = "" - - def write(self, msg): - if len(msg) > 1 and msg[0] != '\n': - if '...' in msg: - self.buffer += msg - elif self.buffer: - self.buffer += msg - self.logger.log(logging.INFO, self.buffer) - self.buffer = "" - else: - self.logger.log(logging.INFO, msg) - - def flush(self): - for handler in self.logger.handlers: - handler.flush() - -class OETestResult(_TestResult): - def __init__(self, tc, *args, **kwargs): - super(OETestResult, self).__init__(*args, **kwargs) - - self.tc = tc - self._tc_map_results() - - def startTest(self, test): - # Allow us to trigger the testcase buffer mode on a per test basis - # so stdout/stderr are only printed upon failure. Enables debugging - # but clean output - if hasattr(test, "buffer"): - self.buffer = test.buffer - super(OETestResult, self).startTest(test) - - def _tc_map_results(self): - self.tc._results['failures'] = self.failures - self.tc._results['errors'] = self.errors - self.tc._results['skipped'] = self.skipped - self.tc._results['expectedFailures'] = self.expectedFailures - - def logSummary(self, component, context_msg=''): - elapsed_time = self.tc._run_end_time - self.tc._run_start_time - self.tc.logger.info("SUMMARY:") - self.tc.logger.info("%s (%s) - Ran %d test%s in %.3fs" % (component, - context_msg, self.testsRun, self.testsRun != 1 and "s" or "", - elapsed_time)) - - if self.wasSuccessful(): - msg = "%s - OK - All required tests passed" % component - else: - msg = "%s - FAIL - Required tests failed" % component - skipped = len(self.tc._results['skipped']) - if skipped: - msg += " (skipped=%d)" % skipped - self.tc.logger.info(msg) - - def _getDetailsNotPassed(self, case, type, desc): - found = False - - for (scase, msg) in self.tc._results[type]: - # XXX: When XML reporting is enabled scase is - # xmlrunner.result._TestInfo instance instead of - # string. - if xmlEnabled: - if case.id() == scase.test_id: - found = True - break - scase_str = scase.test_id - else: - if case == scase: - found = True - break - scase_str = str(scase) - - # When fails at module or class level the class name is passed as string - # so figure out to see if match - m = re.search("^setUpModule \((?P<module_name>.*)\)$", scase_str) - if m: - if case.__class__.__module__ == m.group('module_name'): - found = True - break - - m = re.search("^setUpClass \((?P<class_name>.*)\)$", scase_str) - if m: - class_name = "%s.%s" % (case.__class__.__module__, - case.__class__.__name__) - - if class_name == m.group('class_name'): - found = True - break - - if found: - return (found, msg) - - return (found, None) - - def logDetails(self): - self.tc.logger.info("RESULTS:") - for case_name in self.tc._registry['cases']: - case = self.tc._registry['cases'][case_name] - - result_types = ['failures', 'errors', 'skipped', 'expectedFailures'] - result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL'] - - fail = False - desc = None - for idx, name in enumerate(result_types): - (fail, msg) = self._getDetailsNotPassed(case, result_types[idx], - result_desc[idx]) - if fail: - desc = result_desc[idx] - break - - oeid = -1 - if hasattr(case, 'decorators'): - for d in case.decorators: - if hasattr(d, 'oeid'): - oeid = d.oeid - - if fail: - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), - oeid, desc)) - else: - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), - oeid, 'PASSED')) - -class OEListTestsResult(object): - def wasSuccessful(self): - return True - -class OETestRunner(_TestRunner): - streamLoggerClass = OEStreamLogger - - def __init__(self, tc, *args, **kwargs): - if xmlEnabled: - if not kwargs.get('output'): - kwargs['output'] = os.path.join(os.getcwd(), - 'TestResults_%s_%s' % (time.strftime("%Y%m%d%H%M%S"), os.getpid())) - - kwargs['stream'] = self.streamLoggerClass(tc.logger) - super(OETestRunner, self).__init__(*args, **kwargs) - self.tc = tc - self.resultclass = OETestResult - - # XXX: The unittest-xml-reporting package defines _make_result method instead - # of _makeResult standard on unittest. - if xmlEnabled: - def _make_result(self): - """ - Creates a TestResult object which will be used to store - information about the executed tests. - """ - # override in subclasses if necessary. - return self.resultclass(self.tc, - self.stream, self.descriptions, self.verbosity, self.elapsed_times - ) - else: - def _makeResult(self): - return self.resultclass(self.tc, self.stream, self.descriptions, - self.verbosity) - - - def _walk_suite(self, suite, func): - for obj in suite: - if isinstance(obj, unittest.suite.TestSuite): - if len(obj._tests): - self._walk_suite(obj, func) - elif isinstance(obj, unittest.case.TestCase): - func(self.tc.logger, obj) - self._walked_cases = self._walked_cases + 1 - - def _list_tests_name(self, suite): - from oeqa.core.decorator.oeid import OETestID - from oeqa.core.decorator.oetag import OETestTag - - self._walked_cases = 0 - - def _list_cases_without_id(logger, case): - - found_id = False - if hasattr(case, 'decorators'): - for d in case.decorators: - if isinstance(d, OETestID): - found_id = True - - if not found_id: - logger.info('oeid missing for %s' % case.id()) - - def _list_cases(logger, case): - oeid = None - oetag = None - - if hasattr(case, 'decorators'): - for d in case.decorators: - if isinstance(d, OETestID): - oeid = d.oeid - elif isinstance(d, OETestTag): - oetag = d.oetag - - logger.info("%s\t%s\t\t%s" % (oeid, oetag, case.id())) - - self.tc.logger.info("Listing test cases that don't have oeid ...") - self._walk_suite(suite, _list_cases_without_id) - self.tc.logger.info("-" * 80) - - self.tc.logger.info("Listing all available tests:") - self._walked_cases = 0 - self.tc.logger.info("id\ttag\t\ttest") - self.tc.logger.info("-" * 80) - self._walk_suite(suite, _list_cases) - self.tc.logger.info("-" * 80) - self.tc.logger.info("Total found:\t%s" % self._walked_cases) - - def _list_tests_class(self, suite): - self._walked_cases = 0 - - curr = {} - def _list_classes(logger, case): - if not 'module' in curr or curr['module'] != case.__module__: - curr['module'] = case.__module__ - logger.info(curr['module']) - - if not 'class' in curr or curr['class'] != \ - case.__class__.__name__: - curr['class'] = case.__class__.__name__ - logger.info(" -- %s" % curr['class']) - - logger.info(" -- -- %s" % case._testMethodName) - - self.tc.logger.info("Listing all available test classes:") - self._walk_suite(suite, _list_classes) - - def _list_tests_module(self, suite): - self._walked_cases = 0 - - listed = [] - def _list_modules(logger, case): - if not case.__module__ in listed: - if case.__module__.startswith('_'): - logger.info("%s (hidden)" % case.__module__) - else: - logger.info(case.__module__) - listed.append(case.__module__) - - self.tc.logger.info("Listing all available test modules:") - self._walk_suite(suite, _list_modules) - - def list_tests(self, suite, display_type): - if display_type == 'name': - self._list_tests_name(suite) - elif display_type == 'class': - self._list_tests_class(suite) - elif display_type == 'module': - self._list_tests_module(suite) - - return OEListTestsResult() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/target/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/target/__init__.py deleted file mode 100644 index d2468bc25..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/target/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from abc import abstractmethod - -class OETarget(object): - - def __init__(self, logger, *args, **kwargs): - self.logger = logger - - @abstractmethod - def start(self): - pass - - @abstractmethod - def stop(self): - pass - - @abstractmethod - def run(self, cmd, timeout=None): - pass - - @abstractmethod - def copyTo(self, localSrc, remoteDst): - pass - - @abstractmethod - def copyFrom(self, remoteSrc, localDst): - pass - - @abstractmethod - def copyDirTo(self, localSrc, remoteDst): - pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/target/qemu.py b/import-layers/yocto-poky/meta/lib/oeqa/core/target/qemu.py deleted file mode 100644 index bf3b633f0..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/target/qemu.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import sys -import signal -import time - -from .ssh import OESSHTarget -from oeqa.utils.qemurunner import QemuRunner - -supported_fstypes = ['ext3', 'ext4', 'cpio.gz', 'wic'] - -class OEQemuTarget(OESSHTarget): - def __init__(self, logger, ip, server_ip, timeout=300, user='root', - port=None, machine='', rootfs='', kernel='', kvm=False, - dump_dir='', dump_host_cmds='', display='', bootlog='', - tmpdir='', dir_image='', boottime=60, **kwargs): - - super(OEQemuTarget, self).__init__(logger, ip, server_ip, timeout, - user, port) - - self.ip = ip - self.server_ip = server_ip - self.machine = machine - self.rootfs = rootfs - self.kernel = kernel - self.kvm = kvm - - self.runner = QemuRunner(machine=machine, rootfs=rootfs, tmpdir=tmpdir, - deploy_dir_image=dir_image, display=display, - logfile=bootlog, boottime=boottime, - use_kvm=kvm, dump_dir=dump_dir, - dump_host_cmds=dump_host_cmds, logger=logger) - - def start(self, params=None, extra_bootparams=None): - if self.runner.start(params, extra_bootparams=extra_bootparams): - self.ip = self.runner.ip - self.server_ip = self.runner.server_ip - else: - self.stop() - raise RuntimeError("FAILED to start qemu - check the task log and the boot log") - - def stop(self): - self.runner.stop() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/target/ssh.py b/import-layers/yocto-poky/meta/lib/oeqa/core/target/ssh.py deleted file mode 100644 index 151b99a77..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/target/ssh.py +++ /dev/null @@ -1,267 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import time -import select -import logging -import subprocess -import codecs - -from . import OETarget - -class OESSHTarget(OETarget): - def __init__(self, logger, ip, server_ip, timeout=300, user='root', - port=None, **kwargs): - if not logger: - logger = logging.getLogger('target') - logger.setLevel(logging.INFO) - filePath = os.path.join(os.getcwd(), 'remoteTarget.log') - fileHandler = logging.FileHandler(filePath, 'w', 'utf-8') - formatter = logging.Formatter( - '%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', - '%H:%M:%S') - fileHandler.setFormatter(formatter) - logger.addHandler(fileHandler) - - super(OESSHTarget, self).__init__(logger) - self.ip = ip - self.server_ip = server_ip - self.timeout = timeout - self.user = user - ssh_options = [ - '-o', 'UserKnownHostsFile=/dev/null', - '-o', 'StrictHostKeyChecking=no', - '-o', 'LogLevel=ERROR' - ] - self.ssh = ['ssh', '-l', self.user ] + ssh_options - self.scp = ['scp'] + ssh_options - if port: - self.ssh = self.ssh + [ '-p', port ] - self.scp = self.scp + [ '-P', port ] - - def start(self, **kwargs): - pass - - def stop(self, **kwargs): - pass - - def _run(self, command, timeout=None, ignore_status=True): - """ - Runs command in target using SSHProcess. - """ - self.logger.debug("[Running]$ %s" % " ".join(command)) - - starttime = time.time() - status, output = SSHCall(command, self.logger, timeout) - self.logger.debug("[Command returned '%d' after %.2f seconds]" - "" % (status, time.time() - starttime)) - - if status and not ignore_status: - raise AssertionError("Command '%s' returned non-zero exit " - "status %d:\n%s" % (command, status, output)) - - return (status, output) - - def run(self, command, timeout=None): - """ - Runs command in target. - - command: Command to run on target. - timeout: <value>: Kill command after <val> seconds. - None: Kill command default value seconds. - 0: No timeout, runs until return. - """ - targetCmd = 'export PATH=/usr/sbin:/sbin:/usr/bin:/bin; %s' % command - sshCmd = self.ssh + [self.ip, targetCmd] - - if timeout: - processTimeout = timeout - elif timeout==0: - processTimeout = None - else: - processTimeout = self.timeout - - status, output = self._run(sshCmd, processTimeout, True) - self.logger.debug('Command: %s\nOutput: %s\n' % (command, output)) - return (status, output) - - def copyTo(self, localSrc, remoteDst): - """ - Copy file to target. - - If local file is symlink, recreate symlink in target. - """ - if os.path.islink(localSrc): - link = os.readlink(localSrc) - dstDir, dstBase = os.path.split(remoteDst) - sshCmd = 'cd %s; ln -s %s %s' % (dstDir, link, dstBase) - return self.run(sshCmd) - - else: - remotePath = '%s@%s:%s' % (self.user, self.ip, remoteDst) - scpCmd = self.scp + [localSrc, remotePath] - return self._run(scpCmd, ignore_status=False) - - def copyFrom(self, remoteSrc, localDst): - """ - Copy file from target. - """ - remotePath = '%s@%s:%s' % (self.user, self.ip, remoteSrc) - scpCmd = self.scp + [remotePath, localDst] - return self._run(scpCmd, ignore_status=False) - - def copyDirTo(self, localSrc, remoteDst): - """ - Copy recursively localSrc directory to remoteDst in target. - """ - - for root, dirs, files in os.walk(localSrc): - # Create directories in the target as needed - for d in dirs: - tmpDir = os.path.join(root, d).replace(localSrc, "") - newDir = os.path.join(remoteDst, tmpDir.lstrip("/")) - cmd = "mkdir -p %s" % newDir - self.run(cmd) - - # Copy files into the target - for f in files: - tmpFile = os.path.join(root, f).replace(localSrc, "") - dstFile = os.path.join(remoteDst, tmpFile.lstrip("/")) - srcFile = os.path.join(root, f) - self.copyTo(srcFile, dstFile) - - def deleteFiles(self, remotePath, files): - """ - Deletes files in target's remotePath. - """ - - cmd = "rm" - if not isinstance(files, list): - files = [files] - - for f in files: - cmd = "%s %s" % (cmd, os.path.join(remotePath, f)) - - self.run(cmd) - - - def deleteDir(self, remotePath): - """ - Deletes target's remotePath directory. - """ - - cmd = "rmdir %s" % remotePath - self.run(cmd) - - - def deleteDirStructure(self, localPath, remotePath): - """ - Delete recursively localPath structure directory in target's remotePath. - - This function is very usefult to delete a package that is installed in - the DUT and the host running the test has such package extracted in tmp - directory. - - Example: - pwd: /home/user/tmp - tree: . - └── work - ├── dir1 - │  └── file1 - └── dir2 - - localpath = "/home/user/tmp" and remotepath = "/home/user" - - With the above variables this function will try to delete the - directory in the DUT in this order: - /home/user/work/dir1/file1 - /home/user/work/dir1 (if dir is empty) - /home/user/work/dir2 (if dir is empty) - /home/user/work (if dir is empty) - """ - - for root, dirs, files in os.walk(localPath, topdown=False): - # Delete files first - tmpDir = os.path.join(root).replace(localPath, "") - remoteDir = os.path.join(remotePath, tmpDir.lstrip("/")) - self.deleteFiles(remoteDir, files) - - # Remove dirs if empty - for d in dirs: - tmpDir = os.path.join(root, d).replace(localPath, "") - remoteDir = os.path.join(remotePath, tmpDir.lstrip("/")) - self.deleteDir(remoteDir) - -def SSHCall(command, logger, timeout=None, **opts): - - def run(): - nonlocal output - nonlocal process - starttime = time.time() - process = subprocess.Popen(command, **options) - if timeout: - endtime = starttime + timeout - eof = False - while time.time() < endtime and not eof: - logger.debug('time: %s, endtime: %s' % (time.time(), endtime)) - try: - if select.select([process.stdout], [], [], 5)[0] != []: - reader = codecs.getreader('utf-8')(process.stdout) - data = reader.read(1024, 1024) - if not data: - process.stdout.close() - eof = True - else: - output += data - logger.debug('Partial data from SSH call: %s' % data) - endtime = time.time() + timeout - except InterruptedError: - continue - - # process hasn't returned yet - if not eof: - process.terminate() - time.sleep(5) - try: - process.kill() - except OSError: - pass - endtime = time.time() - starttime - lastline = ("\nProcess killed - no output for %d seconds. Total" - " running time: %d seconds." % (timeout, endtime)) - logger.debug('Received data from SSH call %s ' % lastline) - output += lastline - - else: - output = process.communicate()[0].decode("utf-8", errors='replace') - logger.debug('Data from SSH call: %s' % output.rstrip()) - - options = { - "stdout": subprocess.PIPE, - "stderr": subprocess.STDOUT, - "stdin": None, - "shell": False, - "bufsize": -1, - "preexec_fn": os.setsid, - } - options.update(opts) - output = '' - process = None - - # Unset DISPLAY which means we won't trigger SSH_ASKPASS - env = os.environ.copy() - if "DISPLAY" in env: - del env['DISPLAY'] - options['env'] = env - - try: - run() - except: - # Need to guard against a SystemExit or other exception ocurring - # whilst running and ensure we don't leave a process behind. - if process.poll() is None: - process.kill() - logger.debug('Something went wrong, killing SSH process') - raise - return (process.wait(), output.rstrip()) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/__init__.py +++ /dev/null diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/data.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/data.py deleted file mode 100644 index 88003a6ad..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/data.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase -from oeqa.core.decorator.oetag import OETestTag -from oeqa.core.decorator.data import OETestDataDepends - -class DataTest(OETestCase): - data_vars = ['IMAGE', 'ARCH'] - - @OETestDataDepends(['MACHINE',]) - @OETestTag('dataTestOk') - def testDataOk(self): - self.assertEqual(self.td.get('IMAGE'), 'core-image-minimal') - self.assertEqual(self.td.get('ARCH'), 'x86') - self.assertEqual(self.td.get('MACHINE'), 'qemuarm') - - @OETestTag('dataTestFail') - def testDataFail(self): - pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/depends.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/depends.py deleted file mode 100644 index 17cdd90b1..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/depends.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase -from oeqa.core.decorator.depends import OETestDepends - -class DependsTest(OETestCase): - - def testDependsFirst(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestDepends(['testDependsFirst']) - def testDependsSecond(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestDepends(['testDependsSecond']) - def testDependsThird(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestDepends(['testDependsSecond']) - def testDependsFourth(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestDepends(['testDependsThird', 'testDependsFourth']) - def testDependsFifth(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestDepends(['testDependsCircular3']) - def testDependsCircular1(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestDepends(['testDependsCircular1']) - def testDependsCircular2(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestDepends(['testDependsCircular2']) - def testDependsCircular3(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/invalid/oeid.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/invalid/oeid.py deleted file mode 100644 index 038d44593..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/invalid/oeid.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase - -class AnotherIDTest(OETestCase): - - def testAnotherIdGood(self): - self.assertTrue(True, msg='How is this possible?') - - def testAnotherIdOther(self): - self.assertTrue(True, msg='How is this possible?') - - def testAnotherIdNone(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded.py deleted file mode 100644 index 0fe4cb3f1..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright (C) 2017 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase - -class ThreadedTest(OETestCase): - def test_threaded_no_depends(self): - self.assertTrue(True, msg='How is this possible?') - -class ThreadedTest2(OETestCase): - def test_threaded_same_module(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_alone.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_alone.py deleted file mode 100644 index 905f39784..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_alone.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (C) 2017 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase - -class ThreadedTestAlone(OETestCase): - def test_threaded_alone(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_depends.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_depends.py deleted file mode 100644 index 0c158d3ba..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_depends.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright (C) 2017 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase -from oeqa.core.decorator.depends import OETestDepends - -class ThreadedTest3(OETestCase): - @OETestDepends(['threaded.ThreadedTest.test_threaded_no_depends']) - def test_threaded_depends(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_module.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_module.py deleted file mode 100644 index 63d17e040..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_module.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright (C) 2017 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase - -class ThreadedTestModule(OETestCase): - def test_threaded_module(self): - self.assertTrue(True, msg='How is this possible?') - -class ThreadedTestModule2(OETestCase): - def test_threaded_module2(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/valid/another.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/valid/another.py deleted file mode 100644 index c9ffd1777..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/valid/another.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase - -class AnotherTest(OETestCase): - - def testAnother(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oeid.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oeid.py deleted file mode 100644 index c2d3d32f2..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oeid.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase -from oeqa.core.decorator.oeid import OETestID - -class IDTest(OETestCase): - - @OETestID(101) - def testIdGood(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestID(102) - def testIdOther(self): - self.assertTrue(True, msg='How is this possible?') - - def testIdNone(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oetag.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oetag.py deleted file mode 100644 index 0cae02e75..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oetag.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from oeqa.core.case import OETestCase -from oeqa.core.decorator.oetag import OETestTag - -class TagTest(OETestCase): - - @OETestTag('goodTag') - def testTagGood(self): - self.assertTrue(True, msg='How is this possible?') - - @OETestTag('otherTag') - def testTagOther(self): - self.assertTrue(True, msg='How is this possible?') - - def testTagNone(self): - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/timeout.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/timeout.py deleted file mode 100644 index 870c3157f..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/timeout.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -from time import sleep - -from oeqa.core.case import OETestCase -from oeqa.core.decorator.oetimeout import OETimeout - -class TimeoutTest(OETestCase): - - @OETimeout(1) - def testTimeoutPass(self): - self.assertTrue(True, msg='How is this possible?') - - @OETimeout(1) - def testTimeoutFail(self): - sleep(2) - self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/common.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/common.py deleted file mode 100644 index 193232340..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/common.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import sys -import os - -import unittest -import logging -import os - -logger = logging.getLogger("oeqa") -logger.setLevel(logging.INFO) -consoleHandler = logging.StreamHandler() -formatter = logging.Formatter('OEQATest: %(message)s') -consoleHandler.setFormatter(formatter) -logger.addHandler(consoleHandler) - -def setup_sys_path(): - directory = os.path.dirname(os.path.abspath(__file__)) - oeqa_lib = os.path.realpath(os.path.join(directory, '../../../')) - if not oeqa_lib in sys.path: - sys.path.insert(0, oeqa_lib) - -class TestBase(unittest.TestCase): - def setUp(self): - self.logger = logger - directory = os.path.dirname(os.path.abspath(__file__)) - self.cases_path = os.path.join(directory, 'cases') - - def _testLoader(self, d={}, modules=[], tests=[], filters={}): - from oeqa.core.context import OETestContext - tc = OETestContext(d, self.logger) - tc.loadTests(self.cases_path, modules=modules, tests=tests, - filters=filters) - return tc - - def _testLoaderThreaded(self, d={}, modules=[], - tests=[], filters={}): - from oeqa.core.threaded import OETestContextThreaded - - tc = OETestContextThreaded(d, self.logger) - tc.loadTests(self.cases_path, modules=modules, tests=tests, - filters=filters) - - return tc diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_data.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_data.py deleted file mode 100755 index 320468cbe..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_data.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import unittest -import logging -import os - -from common import setup_sys_path, TestBase -setup_sys_path() - -from oeqa.core.exception import OEQAMissingVariable -from oeqa.core.utils.test import getCaseMethod, getSuiteCasesNames - -class TestData(TestBase): - modules = ['data'] - - def test_data_fail_missing_variable(self): - expectedException = "oeqa.core.exception.OEQAMissingVariable" - - tc = self._testLoader(modules=self.modules) - self.assertEqual(False, tc.runTests().wasSuccessful()) - for test, data in tc._results['errors']: - expect = False - if expectedException in data: - expect = True - - self.assertTrue(expect) - - def test_data_fail_wrong_variable(self): - expectedError = 'AssertionError' - d = {'IMAGE' : 'core-image-sato', 'ARCH' : 'arm'} - - tc = self._testLoader(d=d, modules=self.modules) - self.assertEqual(False, tc.runTests().wasSuccessful()) - for test, data in tc._results['failures']: - expect = False - if expectedError in data: - expect = True - - self.assertTrue(expect) - - def test_data_ok(self): - d = {'IMAGE' : 'core-image-minimal', 'ARCH' : 'x86', 'MACHINE' : 'qemuarm'} - - tc = self._testLoader(d=d, modules=self.modules) - self.assertEqual(True, tc.runTests().wasSuccessful()) - -if __name__ == '__main__': - unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_decorators.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_decorators.py deleted file mode 100755 index cf99e0d72..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_decorators.py +++ /dev/null @@ -1,147 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import signal -import unittest - -from common import setup_sys_path, TestBase -setup_sys_path() - -from oeqa.core.exception import OEQADependency -from oeqa.core.utils.test import getCaseMethod, getSuiteCasesNames, getSuiteCasesIDs - -class TestFilterDecorator(TestBase): - - def _runFilterTest(self, modules, filters, expect, msg): - tc = self._testLoader(modules=modules, filters=filters) - test_loaded = set(getSuiteCasesNames(tc.suites)) - self.assertEqual(expect, test_loaded, msg=msg) - - def test_oetag(self): - # Get all cases without filtering. - filter_all = {} - test_all = {'testTagGood', 'testTagOther', 'testTagNone'} - msg_all = 'Failed to get all oetag cases without filtering.' - - # Get cases with 'goodTag'. - filter_good = {'oetag':'goodTag'} - test_good = {'testTagGood'} - msg_good = 'Failed to get just one test filtering with "goodTag" oetag.' - - # Get cases with an invalid tag. - filter_invalid = {'oetag':'invalidTag'} - test_invalid = set() - msg_invalid = 'Failed to filter all test using an invalid oetag.' - - tests = ((filter_all, test_all, msg_all), - (filter_good, test_good, msg_good), - (filter_invalid, test_invalid, msg_invalid)) - - for test in tests: - self._runFilterTest(['oetag'], test[0], test[1], test[2]) - - def test_oeid(self): - # Get all cases without filtering. - filter_all = {} - test_all = {'testIdGood', 'testIdOther', 'testIdNone'} - msg_all = 'Failed to get all oeid cases without filtering.' - - # Get cases with '101' oeid. - filter_good = {'oeid': 101} - test_good = {'testIdGood'} - msg_good = 'Failed to get just one tes filtering with "101" oeid.' - - # Get cases with an invalid id. - filter_invalid = {'oeid':999} - test_invalid = set() - msg_invalid = 'Failed to filter all test using an invalid oeid.' - - tests = ((filter_all, test_all, msg_all), - (filter_good, test_good, msg_good), - (filter_invalid, test_invalid, msg_invalid)) - - for test in tests: - self._runFilterTest(['oeid'], test[0], test[1], test[2]) - -class TestDependsDecorator(TestBase): - modules = ['depends'] - - def test_depends_order(self): - tests = ['depends.DependsTest.testDependsFirst', - 'depends.DependsTest.testDependsSecond', - 'depends.DependsTest.testDependsThird', - 'depends.DependsTest.testDependsFourth', - 'depends.DependsTest.testDependsFifth'] - tests2 = list(tests) - tests2[2], tests2[3] = tests[3], tests[2] - tc = self._testLoader(modules=self.modules, tests=tests) - test_loaded = getSuiteCasesIDs(tc.suites) - result = True if test_loaded == tests or test_loaded == tests2 else False - msg = 'Failed to order tests using OETestDepends decorator.\nTest order:'\ - ' %s.\nExpected: %s\nOr: %s' % (test_loaded, tests, tests2) - self.assertTrue(result, msg=msg) - - def test_depends_fail_missing_dependency(self): - expect = "TestCase depends.DependsTest.testDependsSecond depends on "\ - "depends.DependsTest.testDependsFirst and isn't available" - tests = ['depends.DependsTest.testDependsSecond'] - try: - # Must throw OEQADependency because missing 'testDependsFirst' - tc = self._testLoader(modules=self.modules, tests=tests) - self.fail('Expected OEQADependency exception') - except OEQADependency as e: - result = True if expect in str(e) else False - msg = 'Expected OEQADependency exception missing testDependsFirst test' - self.assertTrue(result, msg=msg) - - def test_depends_fail_circular_dependency(self): - expect = 'have a circular dependency' - tests = ['depends.DependsTest.testDependsCircular1', - 'depends.DependsTest.testDependsCircular2', - 'depends.DependsTest.testDependsCircular3'] - try: - # Must throw OEQADependency because circular dependency - tc = self._testLoader(modules=self.modules, tests=tests) - self.fail('Expected OEQADependency exception') - except OEQADependency as e: - result = True if expect in str(e) else False - msg = 'Expected OEQADependency exception having a circular dependency' - self.assertTrue(result, msg=msg) - -class TestTimeoutDecorator(TestBase): - modules = ['timeout'] - - def test_timeout(self): - tests = ['timeout.TimeoutTest.testTimeoutPass'] - msg = 'Failed to run test using OETestTimeout' - alarm_signal = signal.getsignal(signal.SIGALRM) - tc = self._testLoader(modules=self.modules, tests=tests) - self.assertTrue(tc.runTests().wasSuccessful(), msg=msg) - msg = "OETestTimeout didn't restore SIGALRM" - self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg) - - def test_timeout_fail(self): - tests = ['timeout.TimeoutTest.testTimeoutFail'] - msg = "OETestTimeout test didn't timeout as expected" - alarm_signal = signal.getsignal(signal.SIGALRM) - tc = self._testLoader(modules=self.modules, tests=tests) - self.assertFalse(tc.runTests().wasSuccessful(), msg=msg) - msg = "OETestTimeout didn't restore SIGALRM" - self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg) - - def test_timeout_thread(self): - tests = ['timeout.TimeoutTest.testTimeoutPass'] - msg = 'Failed to run test using OETestTimeout' - tc = self._testLoaderThreaded(modules=self.modules, tests=tests) - self.assertTrue(tc.runTests().wasSuccessful(), msg=msg) - - def test_timeout_threaded_fail(self): - tests = ['timeout.TimeoutTest.testTimeoutFail'] - msg = "OETestTimeout test didn't timeout as expected" - tc = self._testLoaderThreaded(modules=self.modules, tests=tests) - self.assertFalse(tc.runTests().wasSuccessful(), msg=msg) - -if __name__ == '__main__': - unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_loader.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_loader.py deleted file mode 100755 index e0d917d31..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_loader.py +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2016-2017 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import unittest - -from common import setup_sys_path, TestBase -setup_sys_path() - -from oeqa.core.exception import OEQADependency -from oeqa.core.utils.test import getSuiteModules, getSuiteCasesIDs - -class TestLoader(TestBase): - - def test_fail_empty_filter(self): - filters = {'oetag' : ''} - expect = 'Filter oetag specified is empty' - msg = 'Expected TypeError exception for having invalid filter' - try: - # Must throw TypeError because empty filter - tc = self._testLoader(filters=filters) - self.fail(msg) - except TypeError as e: - result = True if expect in str(e) else False - self.assertTrue(result, msg=msg) - - def test_fail_invalid_filter(self): - filters = {'invalid' : 'good'} - expect = 'filter but not declared in any of' - msg = 'Expected TypeError exception for having invalid filter' - try: - # Must throw TypeError because invalid filter - tc = self._testLoader(filters=filters) - self.fail(msg) - except TypeError as e: - result = True if expect in str(e) else False - self.assertTrue(result, msg=msg) - - def test_fail_duplicated_module(self): - cases_path = self.cases_path - invalid_path = os.path.join(cases_path, 'loader', 'invalid') - self.cases_path = [self.cases_path, invalid_path] - expect = 'Duplicated oeid module found in' - msg = 'Expected ImportError exception for having duplicated module' - try: - # Must throw ImportEror because duplicated module - tc = self._testLoader() - self.fail(msg) - except ImportError as e: - result = True if expect in str(e) else False - self.assertTrue(result, msg=msg) - finally: - self.cases_path = cases_path - - def test_filter_modules(self): - expected_modules = {'oeid', 'oetag'} - tc = self._testLoader(modules=expected_modules) - modules = getSuiteModules(tc.suites) - msg = 'Expected just %s modules' % ', '.join(expected_modules) - self.assertEqual(modules, expected_modules, msg=msg) - - def test_filter_cases(self): - modules = ['oeid', 'oetag', 'data'] - expected_cases = {'data.DataTest.testDataOk', - 'oetag.TagTest.testTagGood', - 'oeid.IDTest.testIdGood'} - tc = self._testLoader(modules=modules, tests=expected_cases) - cases = set(getSuiteCasesIDs(tc.suites)) - msg = 'Expected just %s cases' % ', '.join(expected_cases) - self.assertEqual(cases, expected_cases, msg=msg) - - def test_import_from_paths(self): - cases_path = self.cases_path - cases2_path = os.path.join(cases_path, 'loader', 'valid') - expected_modules = {'oeid', 'another'} - self.cases_path = [self.cases_path, cases2_path] - tc = self._testLoader(modules=expected_modules) - modules = getSuiteModules(tc.suites) - self.cases_path = cases_path - msg = 'Expected modules from two different paths' - self.assertEqual(modules, expected_modules, msg=msg) - - def test_loader_threaded(self): - cases_path = self.cases_path - - self.cases_path = [os.path.join(self.cases_path, 'loader', 'threaded')] - - tc = self._testLoaderThreaded() - self.assertEqual(len(tc.suites), 3, "Expected to be 3 suites") - - case_ids = ['threaded.ThreadedTest.test_threaded_no_depends', - 'threaded.ThreadedTest2.test_threaded_same_module', - 'threaded_depends.ThreadedTest3.test_threaded_depends'] - for case in tc.suites[0]._tests: - self.assertEqual(case.id(), - case_ids[tc.suites[0]._tests.index(case)]) - - case_ids = ['threaded_alone.ThreadedTestAlone.test_threaded_alone'] - for case in tc.suites[1]._tests: - self.assertEqual(case.id(), - case_ids[tc.suites[1]._tests.index(case)]) - - case_ids = ['threaded_module.ThreadedTestModule.test_threaded_module', - 'threaded_module.ThreadedTestModule2.test_threaded_module2'] - for case in tc.suites[2]._tests: - self.assertEqual(case.id(), - case_ids[tc.suites[2]._tests.index(case)]) - - self.cases_path = cases_path - -if __name__ == '__main__': - unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_runner.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_runner.py deleted file mode 100755 index a3f3861fe..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_runner.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import unittest -import logging -import tempfile - -from common import setup_sys_path, TestBase -setup_sys_path() - -from oeqa.core.runner import OEStreamLogger - -class TestRunner(TestBase): - def test_stream_logger(self): - fp = tempfile.TemporaryFile(mode='w+') - - logging.basicConfig(format='%(message)s', stream=fp) - logger = logging.getLogger() - logger.setLevel(logging.INFO) - - oeSL = OEStreamLogger(logger) - - lines = ['init', 'bigline_' * 65535, 'morebigline_' * 65535 * 4, 'end'] - for line in lines: - oeSL.write(line) - - fp.seek(0) - fp_lines = fp.readlines() - for i, fp_line in enumerate(fp_lines): - fp_line = fp_line.strip() - self.assertEqual(lines[i], fp_line) - - fp.close() - -if __name__ == '__main__': - unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/threaded.py b/import-layers/yocto-poky/meta/lib/oeqa/core/threaded.py deleted file mode 100644 index 2cafe03a2..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/threaded.py +++ /dev/null @@ -1,275 +0,0 @@ -# Copyright (C) 2017 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import threading -import multiprocessing -import queue -import time - -from unittest.suite import TestSuite - -from oeqa.core.loader import OETestLoader -from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner -from oeqa.core.context import OETestContext - -class OETestLoaderThreaded(OETestLoader): - def __init__(self, tc, module_paths, modules, tests, modules_required, - filters, process_num=0, *args, **kwargs): - super(OETestLoaderThreaded, self).__init__(tc, module_paths, modules, - tests, modules_required, filters, *args, **kwargs) - - self.process_num = process_num - - def discover(self): - suite = super(OETestLoaderThreaded, self).discover() - - if self.process_num <= 0: - self.process_num = min(multiprocessing.cpu_count(), - len(suite._tests)) - - suites = [] - for _ in range(self.process_num): - suites.append(self.suiteClass()) - - def _search_for_module_idx(suites, case): - """ - Cases in the same module needs to be run - in the same thread because PyUnit keeps track - of setUp{Module, Class,} and tearDown{Module, Class,}. - """ - - for idx in range(self.process_num): - suite = suites[idx] - for c in suite._tests: - if case.__module__ == c.__module__: - return idx - - return -1 - - def _search_for_depend_idx(suites, depends): - """ - Dependency cases needs to be run in the same - thread, because OEQA framework look at the state - of dependant test to figure out if skip or not. - """ - - for idx in range(self.process_num): - suite = suites[idx] - - for case in suite._tests: - if case.id() in depends: - return idx - return -1 - - def _get_best_idx(suites): - sizes = [len(suite._tests) for suite in suites] - return sizes.index(min(sizes)) - - def _fill_suites(suite): - idx = -1 - for case in suite: - if isinstance(case, TestSuite): - _fill_suites(case) - else: - idx = _search_for_module_idx(suites, case) - - depends = {} - if 'depends' in self.tc._registry: - depends = self.tc._registry['depends'] - - if idx == -1 and case.id() in depends: - case_depends = depends[case.id()] - idx = _search_for_depend_idx(suites, case_depends) - - if idx == -1: - idx = _get_best_idx(suites) - - suites[idx].addTest(case) - _fill_suites(suite) - - suites_tmp = suites - suites = [] - for suite in suites_tmp: - if len(suite._tests) > 0: - suites.append(suite) - - return suites - -class OEStreamLoggerThreaded(OEStreamLogger): - _lock = threading.Lock() - buffers = {} - - def write(self, msg): - tid = threading.get_ident() - - if not tid in self.buffers: - self.buffers[tid] = "" - - if msg: - self.buffers[tid] += msg - - def finish(self): - tid = threading.get_ident() - - self._lock.acquire() - self.logger.info('THREAD: %d' % tid) - self.logger.info('-' * 70) - for line in self.buffers[tid].split('\n'): - self.logger.info(line) - self._lock.release() - -class OETestResultThreadedInternal(OETestResult): - def _tc_map_results(self): - tid = threading.get_ident() - - # PyUnit generates a result for every test module run, test - # if the thread already has an entry to avoid lose the previous - # test module results. - if not tid in self.tc._results: - self.tc._results[tid] = {} - self.tc._results[tid]['failures'] = self.failures - self.tc._results[tid]['errors'] = self.errors - self.tc._results[tid]['skipped'] = self.skipped - self.tc._results[tid]['expectedFailures'] = self.expectedFailures - -class OETestResultThreaded(object): - _results = {} - _lock = threading.Lock() - - def __init__(self, tc): - self.tc = tc - - def _fill_tc_results(self): - tids = list(self.tc._results.keys()) - fields = ['failures', 'errors', 'skipped', 'expectedFailures'] - - for tid in tids: - result = self.tc._results[tid] - for field in fields: - if not field in self.tc._results: - self.tc._results[field] = [] - self.tc._results[field].extend(result[field]) - - def addResult(self, result, run_start_time, run_end_time): - tid = threading.get_ident() - - self._lock.acquire() - self._results[tid] = {} - self._results[tid]['result'] = result - self._results[tid]['run_start_time'] = run_start_time - self._results[tid]['run_end_time'] = run_end_time - self._results[tid]['result'] = result - self._lock.release() - - def wasSuccessful(self): - wasSuccessful = True - for tid in self._results.keys(): - wasSuccessful = wasSuccessful and \ - self._results[tid]['result'].wasSuccessful() - return wasSuccessful - - def stop(self): - for tid in self._results.keys(): - self._results[tid]['result'].stop() - - def logSummary(self, component, context_msg=''): - elapsed_time = (self.tc._run_end_time - self.tc._run_start_time) - - self.tc.logger.info("SUMMARY:") - self.tc.logger.info("%s (%s) - Ran %d tests in %.3fs" % (component, - context_msg, len(self.tc._registry['cases']), elapsed_time)) - if self.wasSuccessful(): - msg = "%s - OK - All required tests passed" % component - else: - msg = "%s - FAIL - Required tests failed" % component - self.tc.logger.info(msg) - - def logDetails(self): - if list(self._results): - tid = list(self._results)[0] - result = self._results[tid]['result'] - result.logDetails() - -class _Worker(threading.Thread): - """Thread executing tasks from a given tasks queue""" - def __init__(self, tasks, result, stream): - threading.Thread.__init__(self) - self.tasks = tasks - - self.result = result - self.stream = stream - - def run(self): - while True: - try: - func, args, kargs = self.tasks.get(block=False) - except queue.Empty: - break - - try: - run_start_time = time.time() - rc = func(*args, **kargs) - run_end_time = time.time() - self.result.addResult(rc, run_start_time, run_end_time) - self.stream.finish() - except Exception as e: - print(e) - finally: - self.tasks.task_done() - -class _ThreadedPool: - """Pool of threads consuming tasks from a queue""" - def __init__(self, num_workers, num_tasks, stream=None, result=None): - self.tasks = queue.Queue(num_tasks) - self.workers = [] - - for _ in range(num_workers): - worker = _Worker(self.tasks, result, stream) - self.workers.append(worker) - - def start(self): - for worker in self.workers: - worker.start() - - def add_task(self, func, *args, **kargs): - """Add a task to the queue""" - self.tasks.put((func, args, kargs)) - - def wait_completion(self): - """Wait for completion of all the tasks in the queue""" - self.tasks.join() - for worker in self.workers: - worker.join() - -class OETestRunnerThreaded(OETestRunner): - streamLoggerClass = OEStreamLoggerThreaded - - def __init__(self, tc, *args, **kwargs): - super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs) - self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__ - - def run(self, suites): - result = OETestResultThreaded(self.tc) - - pool = _ThreadedPool(len(suites), len(suites), stream=self.stream, - result=result) - for s in suites: - pool.add_task(super(OETestRunnerThreaded, self).run, s) - pool.start() - pool.wait_completion() - result._fill_tc_results() - - return result - -class OETestContextThreaded(OETestContext): - loaderClass = OETestLoaderThreaded - runnerClass = OETestRunnerThreaded - - def loadTests(self, module_paths, modules=[], tests=[], - modules_manifest="", modules_required=[], filters={}, process_num=0): - if modules_manifest: - modules = self._read_modules_from_manifest(modules_manifest) - - self.loader = self.loaderClass(self, module_paths, modules, tests, - modules_required, filters, process_num) - self.suites = self.loader.discover() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/__init__.py +++ /dev/null diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/misc.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/misc.py deleted file mode 100644 index 0b223b5d0..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/misc.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -def toList(obj, obj_type, obj_name="Object"): - if isinstance(obj, obj_type): - return [obj] - elif isinstance(obj, list): - return obj - else: - raise TypeError("%s must be %s or list" % (obj_name, obj_type)) - -def toSet(obj, obj_type, obj_name="Object"): - if isinstance(obj, obj_type): - return {obj} - elif isinstance(obj, list): - return set(obj) - elif isinstance(obj, set): - return obj - else: - raise TypeError("%s must be %s or set" % (obj_name, obj_type)) - -def strToList(obj, obj_name="Object"): - return toList(obj, str, obj_name) - -def strToSet(obj, obj_name="Object"): - return toSet(obj, str, obj_name) - -def intToList(obj, obj_name="Object"): - return toList(obj, int, obj_name) - -def dataStoteToDict(d, variables): - data = {} - - for v in variables: - data[v] = d.getVar(v) - - return data - -def updateTestData(d, td, variables): - """ - Updates variables with values of data store to test data. - """ - for var in variables: - td[var] = d.getVar(var) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/path.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/path.py deleted file mode 100644 index a21caad5c..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/path.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import sys - -def findFile(file_name, directory): - """ - Search for a file in directory and returns its complete path. - """ - for r, d, f in os.walk(directory): - if file_name in f: - return os.path.join(r, file_name) - return None - -def remove_safe(path): - if os.path.exists(path): - os.remove(path) - diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/test.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/test.py deleted file mode 100644 index 88d5d1398..000000000 --- a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/test.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright (C) 2016 Intel Corporation -# Released under the MIT license (see COPYING.MIT) - -import os -import inspect -import unittest - -def getSuiteCases(suite): - """ - Returns individual test from a test suite. - """ - tests = [] - - if isinstance(suite, unittest.TestCase): - tests.append(suite) - elif isinstance(suite, unittest.suite.TestSuite): - for item in suite: - tests.extend(getSuiteCases(item)) - - return tests - -def getSuiteModules(suite): - """ - Returns modules in a test suite. - """ - modules = set() - for test in getSuiteCases(suite): - modules.add(getCaseModule(test)) - return modules - -def getSuiteCasesInfo(suite, func): - """ - Returns test case info from suite. Info is fetched from func. - """ - tests = [] - for test in getSuiteCases(suite): - tests.append(func(test)) - return tests - -def getSuiteCasesNames(suite): - """ - Returns test case names from suite. - """ - return getSuiteCasesInfo(suite, getCaseMethod) - -def getSuiteCasesIDs(suite): - """ - Returns test case ids from suite. - """ - return getSuiteCasesInfo(suite, getCaseID) - -def getSuiteCasesFiles(suite): - """ - Returns test case files paths from suite. - """ - return getSuiteCasesInfo(suite, getCaseFile) - -def getCaseModule(test_case): - """ - Returns test case module name. - """ - return test_case.__module__ - -def getCaseClass(test_case): - """ - Returns test case class name. - """ - return test_case.__class__.__name__ - -def getCaseID(test_case): - """ - Returns test case complete id. - """ - return test_case.id() - -def getCaseFile(test_case): - """ - Returns test case file path. - """ - return inspect.getsourcefile(test_case.__class__) - -def getCaseMethod(test_case): - """ - Returns test case method name. - """ - return getCaseID(test_case).split('.')[-1] |