summaryrefslogtreecommitdiffstats
path: root/import-layers/yocto-poky/meta/lib/oeqa/utils/decorators.py
diff options
context:
space:
mode:
Diffstat (limited to 'import-layers/yocto-poky/meta/lib/oeqa/utils/decorators.py')
-rw-r--r--import-layers/yocto-poky/meta/lib/oeqa/utils/decorators.py262
1 files changed, 262 insertions, 0 deletions
diff --git a/import-layers/yocto-poky/meta/lib/oeqa/utils/decorators.py b/import-layers/yocto-poky/meta/lib/oeqa/utils/decorators.py
new file mode 100644
index 000000000..0d79223a2
--- /dev/null
+++ b/import-layers/yocto-poky/meta/lib/oeqa/utils/decorators.py
@@ -0,0 +1,262 @@
+# Copyright (C) 2013 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# Some custom decorators that can be used by unittests
+# Most useful is skipUnlessPassed which can be used for
+# creating dependecies between two test methods.
+
+import os
+import logging
+import sys
+import unittest
+import threading
+import signal
+from functools import wraps
+
+#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame
+class getResults(object):
+ def __init__(self):
+ #dynamically determine the unittest.case frame and use it to get the name of the test method
+ ident = threading.current_thread().ident
+ upperf = sys._current_frames()[ident]
+ while (upperf.f_globals['__name__'] != 'unittest.case'):
+ upperf = upperf.f_back
+
+ def handleList(items):
+ ret = []
+ # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())
+ for i in items:
+ s = i[0].id()
+ #Handle the _ErrorHolder objects from skipModule failures
+ if "setUpModule (" in s:
+ ret.append(s.replace("setUpModule (", "").replace(")",""))
+ else:
+ ret.append(s)
+ # Append also the test without the full path
+ testname = s.split('.')[-1]
+ if testname:
+ ret.append(testname)
+ return ret
+ self.faillist = handleList(upperf.f_locals['result'].failures)
+ self.errorlist = handleList(upperf.f_locals['result'].errors)
+ self.skiplist = handleList(upperf.f_locals['result'].skipped)
+
+ def getFailList(self):
+ return self.faillist
+
+ def getErrorList(self):
+ return self.errorlist
+
+ def getSkipList(self):
+ return self.skiplist
+
+class skipIfFailure(object):
+
+ def __init__(self,testcase):
+ self.testcase = testcase
+
+ def __call__(self,f):
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in (res.getFailList() or res.getErrorList()):
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ return wrapped_f
+
+class skipIfSkipped(object):
+
+ def __init__(self,testcase):
+ self.testcase = testcase
+
+ def __call__(self,f):
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in res.getSkipList():
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ return wrapped_f
+
+class skipUnlessPassed(object):
+
+ def __init__(self,testcase):
+ self.testcase = testcase
+
+ def __call__(self,f):
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in res.getSkipList() or \
+ self.testcase in res.getFailList() or \
+ self.testcase in res.getErrorList():
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ wrapped_f._depends_on = self.testcase
+ return wrapped_f
+
+class testcase(object):
+
+ def __init__(self, test_case):
+ self.test_case = test_case
+
+ def __call__(self, func):
+ def wrapped_f(*args, **kwargs):
+ return func(*args, **kwargs)
+ wrapped_f.test_case = self.test_case
+ wrapped_f.__name__ = func.__name__
+ return wrapped_f
+
+class NoParsingFilter(logging.Filter):
+ def filter(self, record):
+ return record.levelno == 100
+
+def LogResults(original_class):
+ orig_method = original_class.run
+
+ from time import strftime, gmtime
+ caller = os.path.basename(sys.argv[0])
+ timestamp = strftime('%Y%m%d%H%M%S',gmtime())
+ logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log')
+ linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log')
+
+ #rewrite the run method of unittest.TestCase to add testcase logging
+ def run(self, result, *args, **kws):
+ orig_method(self, result, *args, **kws)
+ passed = True
+ testMethod = getattr(self, self._testMethodName)
+ #if test case is decorated then use it's number, else use it's name
+ try:
+ test_case = testMethod.test_case
+ except AttributeError:
+ test_case = self._testMethodName
+
+ class_name = str(testMethod.im_class).split("'")[1]
+
+ #create custom logging level for filtering.
+ custom_log_level = 100
+ logging.addLevelName(custom_log_level, 'RESULTS')
+
+ def results(self, message, *args, **kws):
+ if self.isEnabledFor(custom_log_level):
+ self.log(custom_log_level, message, *args, **kws)
+ logging.Logger.results = results
+
+ logging.basicConfig(filename=logfile,
+ filemode='w',
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt='%H:%M:%S',
+ level=custom_log_level)
+ for handler in logging.root.handlers:
+ handler.addFilter(NoParsingFilter())
+ local_log = logging.getLogger(caller)
+
+ #check status of tests and record it
+
+ for (name, msg) in result.errors:
+ if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]):
+ local_log.results("Testcase "+str(test_case)+": ERROR")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.failures:
+ if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]):
+ local_log.results("Testcase "+str(test_case)+": FAILED")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.skipped:
+ if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]):
+ local_log.results("Testcase "+str(test_case)+": SKIPPED")
+ passed = False
+ if passed:
+ local_log.results("Testcase "+str(test_case)+": PASSED")
+
+ # Create symlink to the current log
+ if os.path.exists(linkfile):
+ os.remove(linkfile)
+ os.symlink(logfile, linkfile)
+
+ original_class.run = run
+
+ return original_class
+
+class TimeOut(BaseException):
+ pass
+
+def timeout(seconds):
+ def decorator(fn):
+ if hasattr(signal, 'alarm'):
+ @wraps(fn)
+ def wrapped_f(*args, **kw):
+ current_frame = sys._getframe()
+ def raiseTimeOut(signal, frame):
+ if frame is not current_frame:
+ raise TimeOut('%s seconds' % seconds)
+ prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
+ try:
+ signal.alarm(seconds)
+ return fn(*args, **kw)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+ return wrapped_f
+ else:
+ return fn
+ return decorator
+
+__tag_prefix = "tag__"
+def tag(*args, **kwargs):
+ """Decorator that adds attributes to classes or functions
+ for use with the Attribute (-a) plugin.
+ """
+ def wrap_ob(ob):
+ for name in args:
+ setattr(ob, __tag_prefix + name, True)
+ for name, value in kwargs.iteritems():
+ setattr(ob, __tag_prefix + name, value)
+ return ob
+ return wrap_ob
+
+def gettag(obj, key, default=None):
+ key = __tag_prefix + key
+ if not isinstance(obj, unittest.TestCase):
+ return getattr(obj, key, default)
+ tc_method = getattr(obj, obj._testMethodName)
+ ret = getattr(tc_method, key, getattr(obj, key, default))
+ return ret
+
+def getAllTags(obj):
+ def __gettags(o):
+ r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)}
+ return r
+ if not isinstance(obj, unittest.TestCase):
+ return __gettags(obj)
+ tc_method = getattr(obj, obj._testMethodName)
+ ret = __gettags(obj)
+ ret.update(__gettags(tc_method))
+ return ret
+
+def timeout_handler(seconds):
+ def decorator(fn):
+ if hasattr(signal, 'alarm'):
+ @wraps(fn)
+ def wrapped_f(self, *args, **kw):
+ current_frame = sys._getframe()
+ def raiseTimeOut(signal, frame):
+ if frame is not current_frame:
+ try:
+ self.target.restart()
+ raise TimeOut('%s seconds' % seconds)
+ except:
+ raise TimeOut('%s seconds' % seconds)
+ prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
+ try:
+ signal.alarm(seconds)
+ return fn(self, *args, **kw)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+ return wrapped_f
+ else:
+ return fn
+ return decorator
OpenPOWER on IntegriCloud