summaryrefslogtreecommitdiffstats
path: root/poky/meta/lib/oeqa/utils
diff options
context:
space:
mode:
Diffstat (limited to 'poky/meta/lib/oeqa/utils')
-rw-r--r--poky/meta/lib/oeqa/utils/__init__.py103
-rw-r--r--poky/meta/lib/oeqa/utils/buildproject.py55
-rw-r--r--poky/meta/lib/oeqa/utils/commands.py357
-rw-r--r--poky/meta/lib/oeqa/utils/decorators.py295
-rw-r--r--poky/meta/lib/oeqa/utils/dump.py91
-rw-r--r--poky/meta/lib/oeqa/utils/ftools.py46
-rw-r--r--poky/meta/lib/oeqa/utils/git.py80
-rw-r--r--poky/meta/lib/oeqa/utils/httpserver.py36
-rw-r--r--poky/meta/lib/oeqa/utils/logparser.py126
-rw-r--r--poky/meta/lib/oeqa/utils/metadata.py108
-rw-r--r--poky/meta/lib/oeqa/utils/network.py8
-rw-r--r--poky/meta/lib/oeqa/utils/package_manager.py211
-rw-r--r--poky/meta/lib/oeqa/utils/qemurunner.py591
-rw-r--r--poky/meta/lib/oeqa/utils/qemutinyrunner.py176
-rw-r--r--poky/meta/lib/oeqa/utils/sshcontrol.py242
-rw-r--r--poky/meta/lib/oeqa/utils/subprocesstweak.py19
-rw-r--r--poky/meta/lib/oeqa/utils/targetbuild.py139
-rw-r--r--poky/meta/lib/oeqa/utils/testexport.py263
18 files changed, 2946 insertions, 0 deletions
diff --git a/poky/meta/lib/oeqa/utils/__init__.py b/poky/meta/lib/oeqa/utils/__init__.py
new file mode 100644
index 000000000..d38a32301
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/__init__.py
@@ -0,0 +1,103 @@
+# Enable other layers to have modules in the same named directory
+from pkgutil import extend_path
+__path__ = extend_path(__path__, __name__)
+
+# Borrowed from CalledProcessError
+
+class CommandError(Exception):
+ def __init__(self, retcode, cmd, output = None):
+ self.retcode = retcode
+ self.cmd = cmd
+ self.output = output
+ def __str__(self):
+ return "Command '%s' returned non-zero exit status %d with output: %s" % (self.cmd, self.retcode, self.output)
+
+def avoid_paths_in_environ(paths):
+ """
+ Searches for every path in os.environ['PATH']
+ if found remove it.
+
+ Returns new PATH without avoided PATHs.
+ """
+ import os
+
+ new_path = ''
+ for p in os.environ['PATH'].split(':'):
+ avoid = False
+ for pa in paths:
+ if pa in p:
+ avoid = True
+ break
+ if avoid:
+ continue
+
+ new_path = new_path + p + ':'
+
+ new_path = new_path[:-1]
+ return new_path
+
+def make_logger_bitbake_compatible(logger):
+ import logging
+
+ """
+ Bitbake logger redifines debug() in order to
+ set a level within debug, this breaks compatibility
+ with vainilla logging, so we neeed to redifine debug()
+ method again also add info() method with INFO + 1 level.
+ """
+ def _bitbake_log_debug(*args, **kwargs):
+ lvl = logging.DEBUG
+
+ if isinstance(args[0], int):
+ lvl = args[0]
+ msg = args[1]
+ args = args[2:]
+ else:
+ msg = args[0]
+ args = args[1:]
+
+ logger.log(lvl, msg, *args, **kwargs)
+
+ def _bitbake_log_info(msg, *args, **kwargs):
+ logger.log(logging.INFO + 1, msg, *args, **kwargs)
+
+ logger.debug = _bitbake_log_debug
+ logger.info = _bitbake_log_info
+
+ return logger
+
+def load_test_components(logger, executor):
+ import sys
+ import os
+ import importlib
+
+ from oeqa.core.context import OETestContextExecutor
+
+ components = {}
+
+ for path in sys.path:
+ base_dir = os.path.join(path, 'oeqa')
+ if os.path.exists(base_dir) and os.path.isdir(base_dir):
+ for file in os.listdir(base_dir):
+ comp_name = file
+ comp_context = os.path.join(base_dir, file, 'context.py')
+ if os.path.exists(comp_context):
+ comp_plugin = importlib.import_module('oeqa.%s.%s' % \
+ (comp_name, 'context'))
+ try:
+ if not issubclass(comp_plugin._executor_class,
+ OETestContextExecutor):
+ raise TypeError("Component %s in %s, _executor_class "\
+ "isn't derived from OETestContextExecutor."\
+ % (comp_name, comp_context))
+
+ if comp_plugin._executor_class._script_executor \
+ != executor:
+ continue
+
+ components[comp_name] = comp_plugin._executor_class()
+ except AttributeError:
+ raise AttributeError("Component %s in %s don't have "\
+ "_executor_class defined." % (comp_name, comp_context))
+
+ return components
diff --git a/poky/meta/lib/oeqa/utils/buildproject.py b/poky/meta/lib/oeqa/utils/buildproject.py
new file mode 100644
index 000000000..721f35d99
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/buildproject.py
@@ -0,0 +1,55 @@
+# Copyright (C) 2013-2016 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# Provides a class for automating build tests for projects
+
+import os
+import re
+import subprocess
+import shutil
+import tempfile
+
+from abc import ABCMeta, abstractmethod
+
+class BuildProject(metaclass=ABCMeta):
+ def __init__(self, uri, foldername=None, tmpdir=None, dl_dir=None):
+ self.uri = uri
+ self.archive = os.path.basename(uri)
+ if not tmpdir:
+ tmpdir = tempfile.mkdtemp(prefix='buildproject')
+ self.localarchive = os.path.join(tmpdir, self.archive)
+ self.dl_dir = dl_dir
+ if foldername:
+ self.fname = foldername
+ else:
+ self.fname = re.sub(r'\.tar\.bz2$|\.tar\.gz$|\.tar\.xz$', '', self.archive)
+
+ # Download self.archive to self.localarchive
+ def _download_archive(self):
+ if self.dl_dir and os.path.exists(os.path.join(self.dl_dir, self.archive)):
+ shutil.copyfile(os.path.join(self.dl_dir, self.archive), self.localarchive)
+ return
+
+ cmd = "wget -O %s %s" % (self.localarchive, self.uri)
+ subprocess.check_output(cmd, shell=True)
+
+ # This method should provide a way to run a command in the desired environment.
+ @abstractmethod
+ def _run(self, cmd):
+ pass
+
+ # The timeout parameter of target.run is set to 0 to make the ssh command
+ # run with no timeout.
+ def run_configure(self, configure_args='', extra_cmds=''):
+ return self._run('cd %s; gnu-configize; %s ./configure %s' % (self.targetdir, extra_cmds, configure_args))
+
+ def run_make(self, make_args=''):
+ return self._run('cd %s; make %s' % (self.targetdir, make_args))
+
+ def run_install(self, install_args=''):
+ return self._run('cd %s; make install %s' % (self.targetdir, install_args))
+
+ def clean(self):
+ self._run('rm -rf %s' % self.targetdir)
+ subprocess.check_call('rm -f %s' % self.localarchive, shell=True)
diff --git a/poky/meta/lib/oeqa/utils/commands.py b/poky/meta/lib/oeqa/utils/commands.py
new file mode 100644
index 000000000..0d9cf23fe
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/commands.py
@@ -0,0 +1,357 @@
+# Copyright (c) 2013-2014 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# DESCRIPTION
+# This module is mainly used by scripts/oe-selftest and modules under meta/oeqa/selftest
+# It provides a class and methods for running commands on the host in a convienent way for tests.
+
+
+
+import os
+import sys
+import signal
+import subprocess
+import threading
+import time
+import logging
+from oeqa.utils import CommandError
+from oeqa.utils import ftools
+import re
+import contextlib
+# Export test doesn't require bb
+try:
+ import bb
+except ImportError:
+ pass
+
+class Command(object):
+ def __init__(self, command, bg=False, timeout=None, data=None, output_log=None, **options):
+
+ self.defaultopts = {
+ "stdout": subprocess.PIPE,
+ "stderr": subprocess.STDOUT,
+ "stdin": None,
+ "shell": False,
+ "bufsize": -1,
+ }
+
+ self.cmd = command
+ self.bg = bg
+ self.timeout = timeout
+ self.data = data
+
+ self.options = dict(self.defaultopts)
+ if isinstance(self.cmd, str):
+ self.options["shell"] = True
+ if self.data:
+ self.options['stdin'] = subprocess.PIPE
+ self.options.update(options)
+
+ self.status = None
+ # We collect chunks of output before joining them at the end.
+ self._output_chunks = []
+ self._error_chunks = []
+ self.output = None
+ self.error = None
+ self.threads = []
+
+ self.output_log = output_log
+ self.log = logging.getLogger("utils.commands")
+
+ def run(self):
+ self.process = subprocess.Popen(self.cmd, **self.options)
+
+ def readThread(output, stream, logfunc):
+ if logfunc:
+ for line in stream:
+ output.append(line)
+ logfunc(line.decode("utf-8", errors='replace').rstrip())
+ else:
+ output.append(stream.read())
+
+ def readStderrThread():
+ readThread(self._error_chunks, self.process.stderr, self.output_log.error if self.output_log else None)
+
+ def readStdoutThread():
+ readThread(self._output_chunks, self.process.stdout, self.output_log.info if self.output_log else None)
+
+ def writeThread():
+ try:
+ self.process.stdin.write(self.data)
+ self.process.stdin.close()
+ except OSError as ex:
+ # It's not an error when the command does not consume all
+ # of our data. subprocess.communicate() also ignores that.
+ if ex.errno != EPIPE:
+ raise
+
+ # We write in a separate thread because then we can read
+ # without worrying about deadlocks. The additional thread is
+ # expected to terminate by itself and we mark it as a daemon,
+ # so even it should happen to not terminate for whatever
+ # reason, the main process will still exit, which will then
+ # kill the write thread.
+ if self.data:
+ threading.Thread(target=writeThread, daemon=True).start()
+ if self.process.stderr:
+ thread = threading.Thread(target=readStderrThread)
+ thread.start()
+ self.threads.append(thread)
+ if self.output_log:
+ self.output_log.info('Running: %s' % self.cmd)
+ thread = threading.Thread(target=readStdoutThread)
+ thread.start()
+ self.threads.append(thread)
+
+ self.log.debug("Running command '%s'" % self.cmd)
+
+ if not self.bg:
+ if self.timeout is None:
+ for thread in self.threads:
+ thread.join()
+ else:
+ deadline = time.time() + self.timeout
+ for thread in self.threads:
+ timeout = deadline - time.time()
+ if timeout < 0:
+ timeout = 0
+ thread.join(timeout)
+ self.stop()
+
+ def stop(self):
+ for thread in self.threads:
+ if thread.isAlive():
+ self.process.terminate()
+ # let's give it more time to terminate gracefully before killing it
+ thread.join(5)
+ if thread.isAlive():
+ self.process.kill()
+ thread.join()
+
+ def finalize_output(data):
+ if not data:
+ data = ""
+ else:
+ data = b"".join(data)
+ data = data.decode("utf-8", errors='replace').rstrip()
+ return data
+
+ self.output = finalize_output(self._output_chunks)
+ self._output_chunks = None
+ # self.error used to be a byte string earlier, probably unintentionally.
+ # Now it is a normal string, just like self.output.
+ self.error = finalize_output(self._error_chunks)
+ self._error_chunks = None
+ # At this point we know that the process has closed stdout/stderr, so
+ # it is safe and necessary to wait for the actual process completion.
+ self.status = self.process.wait()
+
+ self.log.debug("Command '%s' returned %d as exit code." % (self.cmd, self.status))
+ # logging the complete output is insane
+ # bitbake -e output is really big
+ # and makes the log file useless
+ if self.status:
+ lout = "\n".join(self.output.splitlines()[-20:])
+ self.log.debug("Last 20 lines:\n%s" % lout)
+
+
+class Result(object):
+ pass
+
+
+def runCmd(command, ignore_status=False, timeout=None, assert_error=True,
+ native_sysroot=None, limit_exc_output=0, output_log=None, **options):
+ result = Result()
+
+ if native_sysroot:
+ extra_paths = "%s/sbin:%s/usr/sbin:%s/usr/bin" % \
+ (native_sysroot, native_sysroot, native_sysroot)
+ nenv = dict(options.get('env', os.environ))
+ nenv['PATH'] = extra_paths + ':' + nenv.get('PATH', '')
+ options['env'] = nenv
+
+ cmd = Command(command, timeout=timeout, output_log=output_log, **options)
+ cmd.run()
+
+ result.command = command
+ result.status = cmd.status
+ result.output = cmd.output
+ result.error = cmd.error
+ result.pid = cmd.process.pid
+
+ if result.status and not ignore_status:
+ exc_output = result.output
+ if limit_exc_output > 0:
+ split = result.output.splitlines()
+ if len(split) > limit_exc_output:
+ exc_output = "\n... (last %d lines of output)\n" % limit_exc_output + \
+ '\n'.join(split[-limit_exc_output:])
+ if assert_error:
+ raise AssertionError("Command '%s' returned non-zero exit status %d:\n%s" % (command, result.status, exc_output))
+ else:
+ raise CommandError(result.status, command, exc_output)
+
+ return result
+
+
+def bitbake(command, ignore_status=False, timeout=None, postconfig=None, output_log=None, **options):
+
+ if postconfig:
+ postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf')
+ ftools.write_file(postconfig_file, postconfig)
+ extra_args = "-R %s" % postconfig_file
+ else:
+ extra_args = ""
+
+ if isinstance(command, str):
+ cmd = "bitbake " + extra_args + " " + command
+ else:
+ cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]]
+
+ try:
+ return runCmd(cmd, ignore_status, timeout, output_log=output_log, **options)
+ finally:
+ if postconfig:
+ os.remove(postconfig_file)
+
+
+def get_bb_env(target=None, postconfig=None):
+ if target:
+ return bitbake("-e %s" % target, postconfig=postconfig).output
+ else:
+ return bitbake("-e", postconfig=postconfig).output
+
+def get_bb_vars(variables=None, target=None, postconfig=None):
+ """Get values of multiple bitbake variables"""
+ bbenv = get_bb_env(target, postconfig=postconfig)
+
+ if variables is not None:
+ variables = list(variables)
+ var_re = re.compile(r'^(export )?(?P<var>\w+(_.*)?)="(?P<value>.*)"$')
+ unset_re = re.compile(r'^unset (?P<var>\w+)$')
+ lastline = None
+ values = {}
+ for line in bbenv.splitlines():
+ match = var_re.match(line)
+ val = None
+ if match:
+ val = match.group('value')
+ else:
+ match = unset_re.match(line)
+ if match:
+ # Handle [unexport] variables
+ if lastline.startswith('# "'):
+ val = lastline.split('"')[1]
+ if val:
+ var = match.group('var')
+ if variables is None:
+ values[var] = val
+ else:
+ if var in variables:
+ values[var] = val
+ variables.remove(var)
+ # Stop after all required variables have been found
+ if not variables:
+ break
+ lastline = line
+ if variables:
+ # Fill in missing values
+ for var in variables:
+ values[var] = None
+ return values
+
+def get_bb_var(var, target=None, postconfig=None):
+ return get_bb_vars([var], target, postconfig)[var]
+
+def get_test_layer():
+ layers = get_bb_var("BBLAYERS").split()
+ testlayer = None
+ for l in layers:
+ if '~' in l:
+ l = os.path.expanduser(l)
+ if "/meta-selftest" in l and os.path.isdir(l):
+ testlayer = l
+ break
+ return testlayer
+
+def create_temp_layer(templayerdir, templayername, priority=999, recipepathspec='recipes-*/*'):
+ os.makedirs(os.path.join(templayerdir, 'conf'))
+ with open(os.path.join(templayerdir, 'conf', 'layer.conf'), 'w') as f:
+ f.write('BBPATH .= ":${LAYERDIR}"\n')
+ f.write('BBFILES += "${LAYERDIR}/%s/*.bb \\' % recipepathspec)
+ f.write(' ${LAYERDIR}/%s/*.bbappend"\n' % recipepathspec)
+ f.write('BBFILE_COLLECTIONS += "%s"\n' % templayername)
+ f.write('BBFILE_PATTERN_%s = "^${LAYERDIR}/"\n' % templayername)
+ f.write('BBFILE_PRIORITY_%s = "%d"\n' % (templayername, priority))
+ f.write('BBFILE_PATTERN_IGNORE_EMPTY_%s = "1"\n' % templayername)
+ f.write('LAYERSERIES_COMPAT_%s = "${LAYERSERIES_COMPAT_core}"\n' % templayername)
+
+@contextlib.contextmanager
+def runqemu(pn, ssh=True, runqemuparams='', image_fstype=None, launch_cmd=None, qemuparams=None, overrides={}, discard_writes=True):
+ """
+ launch_cmd means directly run the command, don't need set rootfs or env vars.
+ """
+
+ import bb.tinfoil
+ import bb.build
+
+ # Need a non-'BitBake' logger to capture the runner output
+ targetlogger = logging.getLogger('TargetRunner')
+ targetlogger.setLevel(logging.DEBUG)
+ handler = logging.StreamHandler(sys.stdout)
+ targetlogger.addHandler(handler)
+
+ tinfoil = bb.tinfoil.Tinfoil()
+ tinfoil.prepare(config_only=False, quiet=True)
+ try:
+ tinfoil.logger.setLevel(logging.WARNING)
+ import oeqa.targetcontrol
+ tinfoil.config_data.setVar("TEST_LOG_DIR", "${WORKDIR}/testimage")
+ tinfoil.config_data.setVar("TEST_QEMUBOOT_TIMEOUT", "1000")
+ # Tell QemuTarget() whether need find rootfs/kernel or not
+ if launch_cmd:
+ tinfoil.config_data.setVar("FIND_ROOTFS", '0')
+ else:
+ tinfoil.config_data.setVar("FIND_ROOTFS", '1')
+
+ recipedata = tinfoil.parse_recipe(pn)
+ for key, value in overrides.items():
+ recipedata.setVar(key, value)
+
+ logdir = recipedata.getVar("TEST_LOG_DIR")
+
+ qemu = oeqa.targetcontrol.QemuTarget(recipedata, targetlogger, image_fstype)
+ finally:
+ # We need to shut down tinfoil early here in case we actually want
+ # to run tinfoil-using utilities with the running QEMU instance.
+ # Luckily QemuTarget doesn't need it after the constructor.
+ tinfoil.shutdown()
+
+ try:
+ qemu.deploy()
+ try:
+ qemu.start(params=qemuparams, ssh=ssh, runqemuparams=runqemuparams, launch_cmd=launch_cmd, discard_writes=discard_writes)
+ except bb.build.FuncFailed:
+ raise Exception('Failed to start QEMU - see the logs in %s' % logdir)
+
+ yield qemu
+
+ finally:
+ try:
+ qemu.stop()
+ except:
+ pass
+ targetlogger.removeHandler(handler)
+
+def updateEnv(env_file):
+ """
+ Source a file and update environment.
+ """
+
+ cmd = ". %s; env -0" % env_file
+ result = runCmd(cmd)
+
+ for line in result.output.split("\0"):
+ (key, _, value) = line.partition("=")
+ os.environ[key] = value
diff --git a/poky/meta/lib/oeqa/utils/decorators.py b/poky/meta/lib/oeqa/utils/decorators.py
new file mode 100644
index 000000000..d87689692
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/decorators.py
@@ -0,0 +1,295 @@
+# Copyright (C) 2013 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# Some custom decorators that can be used by unittests
+# Most useful is skipUnlessPassed which can be used for
+# creating dependecies between two test methods.
+
+import os
+import logging
+import sys
+import unittest
+import threading
+import signal
+from functools import wraps
+
+#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame
+class getResults(object):
+ def __init__(self):
+ #dynamically determine the unittest.case frame and use it to get the name of the test method
+ ident = threading.current_thread().ident
+ upperf = sys._current_frames()[ident]
+ while (upperf.f_globals['__name__'] != 'unittest.case'):
+ upperf = upperf.f_back
+
+ def handleList(items):
+ ret = []
+ # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())
+ for i in items:
+ s = i[0].id()
+ #Handle the _ErrorHolder objects from skipModule failures
+ if "setUpModule (" in s:
+ ret.append(s.replace("setUpModule (", "").replace(")",""))
+ else:
+ ret.append(s)
+ # Append also the test without the full path
+ testname = s.split('.')[-1]
+ if testname:
+ ret.append(testname)
+ return ret
+ self.faillist = handleList(upperf.f_locals['result'].failures)
+ self.errorlist = handleList(upperf.f_locals['result'].errors)
+ self.skiplist = handleList(upperf.f_locals['result'].skipped)
+
+ def getFailList(self):
+ return self.faillist
+
+ def getErrorList(self):
+ return self.errorlist
+
+ def getSkipList(self):
+ return self.skiplist
+
+class skipIfFailure(object):
+
+ def __init__(self,testcase):
+ self.testcase = testcase
+
+ def __call__(self,f):
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in (res.getFailList() or res.getErrorList()):
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ return wrapped_f
+
+class skipIfSkipped(object):
+
+ def __init__(self,testcase):
+ self.testcase = testcase
+
+ def __call__(self,f):
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in res.getSkipList():
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ return wrapped_f
+
+class skipUnlessPassed(object):
+
+ def __init__(self,testcase):
+ self.testcase = testcase
+
+ def __call__(self,f):
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in res.getSkipList() or \
+ self.testcase in res.getFailList() or \
+ self.testcase in res.getErrorList():
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ wrapped_f._depends_on = self.testcase
+ return wrapped_f
+
+class testcase(object):
+ def __init__(self, test_case):
+ self.test_case = test_case
+
+ def __call__(self, func):
+ @wraps(func)
+ def wrapped_f(*args, **kwargs):
+ return func(*args, **kwargs)
+ wrapped_f.test_case = self.test_case
+ wrapped_f.__name__ = func.__name__
+ return wrapped_f
+
+class NoParsingFilter(logging.Filter):
+ def filter(self, record):
+ return record.levelno == 100
+
+import inspect
+
+def LogResults(original_class):
+ orig_method = original_class.run
+
+ from time import strftime, gmtime
+ caller = os.path.basename(sys.argv[0])
+ timestamp = strftime('%Y%m%d%H%M%S',gmtime())
+ logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log')
+ linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log')
+
+ def get_class_that_defined_method(meth):
+ if inspect.ismethod(meth):
+ for cls in inspect.getmro(meth.__self__.__class__):
+ if cls.__dict__.get(meth.__name__) is meth:
+ return cls
+ meth = meth.__func__ # fallback to __qualname__ parsing
+ if inspect.isfunction(meth):
+ cls = getattr(inspect.getmodule(meth),
+ meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
+ if isinstance(cls, type):
+ return cls
+ return None
+
+ #rewrite the run method of unittest.TestCase to add testcase logging
+ def run(self, result, *args, **kws):
+ orig_method(self, result, *args, **kws)
+ passed = True
+ testMethod = getattr(self, self._testMethodName)
+ #if test case is decorated then use it's number, else use it's name
+ try:
+ test_case = testMethod.test_case
+ except AttributeError:
+ test_case = self._testMethodName
+
+ class_name = str(get_class_that_defined_method(testMethod)).split("'")[1]
+
+ #create custom logging level for filtering.
+ custom_log_level = 100
+ logging.addLevelName(custom_log_level, 'RESULTS')
+
+ def results(self, message, *args, **kws):
+ if self.isEnabledFor(custom_log_level):
+ self.log(custom_log_level, message, *args, **kws)
+ logging.Logger.results = results
+
+ logging.basicConfig(filename=logfile,
+ filemode='w',
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt='%H:%M:%S',
+ level=custom_log_level)
+ for handler in logging.root.handlers:
+ handler.addFilter(NoParsingFilter())
+ local_log = logging.getLogger(caller)
+
+ #check status of tests and record it
+
+ tcid = self.id()
+ for (name, msg) in result.errors:
+ if tcid == name.id():
+ local_log.results("Testcase "+str(test_case)+": ERROR")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.failures:
+ if tcid == name.id():
+ local_log.results("Testcase "+str(test_case)+": FAILED")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.skipped:
+ if tcid == name.id():
+ local_log.results("Testcase "+str(test_case)+": SKIPPED")
+ passed = False
+ if passed:
+ local_log.results("Testcase "+str(test_case)+": PASSED")
+
+ # XXX: In order to avoid race condition when test if exists the linkfile
+ # use bb.utils.lock, the best solution is to create a unique name for the
+ # link file.
+ try:
+ import bb
+ has_bb = True
+ lockfilename = linkfile + '.lock'
+ except ImportError:
+ has_bb = False
+
+ if has_bb:
+ lf = bb.utils.lockfile(lockfilename, block=True)
+ # Create symlink to the current log
+ if os.path.lexists(linkfile):
+ os.remove(linkfile)
+ os.symlink(logfile, linkfile)
+ if has_bb:
+ bb.utils.unlockfile(lf)
+
+ original_class.run = run
+
+ return original_class
+
+class TimeOut(BaseException):
+ pass
+
+def timeout(seconds):
+ def decorator(fn):
+ if hasattr(signal, 'alarm'):
+ @wraps(fn)
+ def wrapped_f(*args, **kw):
+ current_frame = sys._getframe()
+ def raiseTimeOut(signal, frame):
+ if frame is not current_frame:
+ raise TimeOut('%s seconds' % seconds)
+ prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
+ try:
+ signal.alarm(seconds)
+ return fn(*args, **kw)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+ return wrapped_f
+ else:
+ return fn
+ return decorator
+
+__tag_prefix = "tag__"
+def tag(*args, **kwargs):
+ """Decorator that adds attributes to classes or functions
+ for use with the Attribute (-a) plugin.
+ """
+ def wrap_ob(ob):
+ for name in args:
+ setattr(ob, __tag_prefix + name, True)
+ for name, value in kwargs.items():
+ setattr(ob, __tag_prefix + name, value)
+ return ob
+ return wrap_ob
+
+def gettag(obj, key, default=None):
+ key = __tag_prefix + key
+ if not isinstance(obj, unittest.TestCase):
+ return getattr(obj, key, default)
+ tc_method = getattr(obj, obj._testMethodName)
+ ret = getattr(tc_method, key, getattr(obj, key, default))
+ return ret
+
+def getAllTags(obj):
+ def __gettags(o):
+ r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)}
+ return r
+ if not isinstance(obj, unittest.TestCase):
+ return __gettags(obj)
+ tc_method = getattr(obj, obj._testMethodName)
+ ret = __gettags(obj)
+ ret.update(__gettags(tc_method))
+ return ret
+
+def timeout_handler(seconds):
+ def decorator(fn):
+ if hasattr(signal, 'alarm'):
+ @wraps(fn)
+ def wrapped_f(self, *args, **kw):
+ current_frame = sys._getframe()
+ def raiseTimeOut(signal, frame):
+ if frame is not current_frame:
+ try:
+ self.target.restart()
+ raise TimeOut('%s seconds' % seconds)
+ except:
+ raise TimeOut('%s seconds' % seconds)
+ prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
+ try:
+ signal.alarm(seconds)
+ return fn(self, *args, **kw)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+ return wrapped_f
+ else:
+ return fn
+ return decorator
diff --git a/poky/meta/lib/oeqa/utils/dump.py b/poky/meta/lib/oeqa/utils/dump.py
new file mode 100644
index 000000000..5a7edc1a8
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/dump.py
@@ -0,0 +1,91 @@
+import os
+import sys
+import errno
+import datetime
+import itertools
+from .commands import runCmd
+
+class BaseDumper(object):
+ """ Base class to dump commands from host/target """
+
+ def __init__(self, cmds, parent_dir):
+ self.cmds = []
+ # Some testing doesn't inherit testimage, so it is needed
+ # to set some defaults.
+ self.parent_dir = parent_dir or "/tmp/oe-saved-tests"
+ dft_cmds = """ top -bn1
+ iostat -x -z -N -d -p ALL 20 2
+ ps -ef
+ free
+ df
+ memstat
+ dmesg
+ ip -s link
+ netstat -an"""
+ if not cmds:
+ cmds = dft_cmds
+ for cmd in cmds.split('\n'):
+ cmd = cmd.lstrip()
+ if not cmd or cmd[0] == '#':
+ continue
+ self.cmds.append(cmd)
+
+ def create_dir(self, dir_suffix):
+ dump_subdir = ("%s_%s" % (
+ datetime.datetime.now().strftime('%Y%m%d%H%M'),
+ dir_suffix))
+ dump_dir = os.path.join(self.parent_dir, dump_subdir)
+ try:
+ os.makedirs(dump_dir)
+ except OSError as err:
+ if err.errno != errno.EEXIST:
+ raise err
+ self.dump_dir = dump_dir
+
+ def _write_dump(self, command, output):
+ if isinstance(self, HostDumper):
+ prefix = "host"
+ elif isinstance(self, TargetDumper):
+ prefix = "target"
+ else:
+ prefix = "unknown"
+ for i in itertools.count():
+ filename = "%s_%02d_%s" % (prefix, i, command)
+ fullname = os.path.join(self.dump_dir, filename)
+ if not os.path.exists(fullname):
+ break
+ with open(fullname, 'w') as dump_file:
+ dump_file.write(output)
+
+
+class HostDumper(BaseDumper):
+ """ Class to get dumps from the host running the tests """
+
+ def __init__(self, cmds, parent_dir):
+ super(HostDumper, self).__init__(cmds, parent_dir)
+
+ def dump_host(self, dump_dir=""):
+ if dump_dir:
+ self.dump_dir = dump_dir
+ for cmd in self.cmds:
+ result = runCmd(cmd, ignore_status=True)
+ self._write_dump(cmd.split()[0], result.output)
+
+class TargetDumper(BaseDumper):
+ """ Class to get dumps from target, it only works with QemuRunner """
+
+ def __init__(self, cmds, parent_dir, runner):
+ super(TargetDumper, self).__init__(cmds, parent_dir)
+ self.runner = runner
+
+ def dump_target(self, dump_dir=""):
+ if dump_dir:
+ self.dump_dir = dump_dir
+ for cmd in self.cmds:
+ # We can continue with the testing if serial commands fail
+ try:
+ (status, output) = self.runner.run_serial(cmd)
+ self._write_dump(cmd.split()[0], output)
+ except:
+ print("Tried to dump info from target but "
+ "serial console failed")
diff --git a/poky/meta/lib/oeqa/utils/ftools.py b/poky/meta/lib/oeqa/utils/ftools.py
new file mode 100644
index 000000000..a7233d4ca
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/ftools.py
@@ -0,0 +1,46 @@
+import os
+import re
+import errno
+
+def write_file(path, data):
+ # In case data is None, return immediately
+ if data is None:
+ return
+ wdata = data.rstrip() + "\n"
+ with open(path, "w") as f:
+ f.write(wdata)
+
+def append_file(path, data):
+ # In case data is None, return immediately
+ if data is None:
+ return
+ wdata = data.rstrip() + "\n"
+ with open(path, "a") as f:
+ f.write(wdata)
+
+def read_file(path):
+ data = None
+ with open(path) as f:
+ data = f.read()
+ return data
+
+def remove_from_file(path, data):
+ # In case data is None, return immediately
+ if data is None:
+ return
+ try:
+ rdata = read_file(path)
+ except IOError as e:
+ # if file does not exit, just quit, otherwise raise an exception
+ if e.errno == errno.ENOENT:
+ return
+ else:
+ raise
+
+ contents = rdata.strip().splitlines()
+ for r in data.strip().splitlines():
+ try:
+ contents.remove(r)
+ except ValueError:
+ pass
+ write_file(path, "\n".join(contents))
diff --git a/poky/meta/lib/oeqa/utils/git.py b/poky/meta/lib/oeqa/utils/git.py
new file mode 100644
index 000000000..757e3f0cb
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/git.py
@@ -0,0 +1,80 @@
+#
+# Copyright (C) 2016 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+#
+"""Git repository interactions"""
+import os
+
+from oeqa.utils.commands import runCmd
+
+
+class GitError(Exception):
+ """Git error handling"""
+ pass
+
+class GitRepo(object):
+ """Class representing a Git repository clone"""
+ def __init__(self, path, is_topdir=False):
+ git_dir = self._run_git_cmd_at(['rev-parse', '--git-dir'], path)
+ git_dir = git_dir if os.path.isabs(git_dir) else os.path.join(path, git_dir)
+ self.git_dir = os.path.realpath(git_dir)
+
+ if self._run_git_cmd_at(['rev-parse', '--is-bare-repository'], path) == 'true':
+ self.bare = True
+ self.top_dir = self.git_dir
+ else:
+ self.bare = False
+ self.top_dir = self._run_git_cmd_at(['rev-parse', '--show-toplevel'],
+ path)
+ realpath = os.path.realpath(path)
+ if is_topdir and realpath != self.top_dir:
+ raise GitError("{} is not a Git top directory".format(realpath))
+
+ @staticmethod
+ def _run_git_cmd_at(git_args, cwd, **kwargs):
+ """Run git command at a specified directory"""
+ git_cmd = 'git ' if isinstance(git_args, str) else ['git']
+ git_cmd += git_args
+ ret = runCmd(git_cmd, ignore_status=True, cwd=cwd, **kwargs)
+ if ret.status:
+ cmd_str = git_cmd if isinstance(git_cmd, str) \
+ else ' '.join(git_cmd)
+ raise GitError("'{}' failed with exit code {}: {}".format(
+ cmd_str, ret.status, ret.output))
+ return ret.output.strip()
+
+ @staticmethod
+ def init(path, bare=False):
+ """Initialize a new Git repository"""
+ cmd = ['init']
+ if bare:
+ cmd.append('--bare')
+ GitRepo._run_git_cmd_at(cmd, cwd=path)
+ return GitRepo(path, is_topdir=True)
+
+ def run_cmd(self, git_args, env_update=None):
+ """Run Git command"""
+ env = None
+ if env_update:
+ env = os.environ.copy()
+ env.update(env_update)
+ return self._run_git_cmd_at(git_args, self.top_dir, env=env)
+
+ def rev_parse(self, revision):
+ """Do git rev-parse"""
+ try:
+ return self.run_cmd(['rev-parse', '--verify', revision])
+ except GitError:
+ # Revision does not exist
+ return None
+
+ def get_current_branch(self):
+ """Get current branch"""
+ try:
+ # Strip 11 chars, i.e. 'refs/heads' from the beginning
+ return self.run_cmd(['symbolic-ref', 'HEAD'])[11:]
+ except GitError:
+ return None
+
+
diff --git a/poky/meta/lib/oeqa/utils/httpserver.py b/poky/meta/lib/oeqa/utils/httpserver.py
new file mode 100644
index 000000000..7d1233145
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/httpserver.py
@@ -0,0 +1,36 @@
+import http.server
+import multiprocessing
+import os
+from socketserver import ThreadingMixIn
+
+class HTTPServer(ThreadingMixIn, http.server.HTTPServer):
+
+ def server_start(self, root_dir):
+ import signal
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ os.chdir(root_dir)
+ self.serve_forever()
+
+class HTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
+
+ def log_message(self, format_str, *args):
+ pass
+
+class HTTPService(object):
+
+ def __init__(self, root_dir, host=''):
+ self.root_dir = root_dir
+ self.host = host
+ self.port = 0
+
+ def start(self):
+ self.server = HTTPServer((self.host, self.port), HTTPRequestHandler)
+ if self.port == 0:
+ self.port = self.server.server_port
+ self.process = multiprocessing.Process(target=self.server.server_start, args=[self.root_dir])
+ self.process.start()
+
+ def stop(self):
+ self.server.server_close()
+ self.process.terminate()
+ self.process.join()
diff --git a/poky/meta/lib/oeqa/utils/logparser.py b/poky/meta/lib/oeqa/utils/logparser.py
new file mode 100644
index 000000000..0670627c3
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/logparser.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+
+import sys
+import os
+import re
+from . import ftools
+
+
+# A parser that can be used to identify weather a line is a test result or a section statement.
+class Lparser(object):
+
+ def __init__(self, test_0_pass_regex, test_0_fail_regex, test_0_skip_regex, section_0_begin_regex=None, section_0_end_regex=None, **kwargs):
+ # Initialize the arguments dictionary
+ if kwargs:
+ self.args = kwargs
+ else:
+ self.args = {}
+
+ # Add the default args to the dictionary
+ self.args['test_0_pass_regex'] = test_0_pass_regex
+ self.args['test_0_fail_regex'] = test_0_fail_regex
+ self.args['test_0_skip_regex'] = test_0_skip_regex
+ if section_0_begin_regex:
+ self.args['section_0_begin_regex'] = section_0_begin_regex
+ if section_0_end_regex:
+ self.args['section_0_end_regex'] = section_0_end_regex
+
+ self.test_possible_status = ['pass', 'fail', 'error', 'skip']
+ self.section_possible_status = ['begin', 'end']
+
+ self.initialized = False
+
+
+ # Initialize the parser with the current configuration
+ def init(self):
+
+ # extra arguments can be added by the user to define new test and section categories. They must follow a pre-defined pattern: <type>_<category_name>_<status>_regex
+ self.test_argument_pattern = "^test_(.+?)_(%s)_regex" % '|'.join(map(str, self.test_possible_status))
+ self.section_argument_pattern = "^section_(.+?)_(%s)_regex" % '|'.join(map(str, self.section_possible_status))
+
+ # Initialize the test and section regex dictionaries
+ self.test_regex = {}
+ self.section_regex ={}
+
+ for arg, value in self.args.items():
+ if not value:
+ raise Exception('The value of provided argument %s is %s. Should have a valid value.' % (key, value))
+ is_test = re.search(self.test_argument_pattern, arg)
+ is_section = re.search(self.section_argument_pattern, arg)
+ if is_test:
+ if not is_test.group(1) in self.test_regex:
+ self.test_regex[is_test.group(1)] = {}
+ self.test_regex[is_test.group(1)][is_test.group(2)] = re.compile(value)
+ elif is_section:
+ if not is_section.group(1) in self.section_regex:
+ self.section_regex[is_section.group(1)] = {}
+ self.section_regex[is_section.group(1)][is_section.group(2)] = re.compile(value)
+ else:
+ # TODO: Make these call a traceback instead of a simple exception..
+ raise Exception("The provided argument name does not correspond to any valid type. Please give one of the following types:\nfor tests: %s\nfor sections: %s" % (self.test_argument_pattern, self.section_argument_pattern))
+
+ self.initialized = True
+
+ # Parse a line and return a tuple containing the type of result (test/section) and its category, status and name
+ def parse_line(self, line):
+ if not self.initialized:
+ raise Exception("The parser is not initialized..")
+
+ for test_category, test_status_list in self.test_regex.items():
+ for test_status, status_regex in test_status_list.items():
+ test_name = status_regex.search(line)
+ if test_name:
+ return ['test', test_category, test_status, test_name.group(1)]
+
+ for section_category, section_status_list in self.section_regex.items():
+ for section_status, status_regex in section_status_list.items():
+ section_name = status_regex.search(line)
+ if section_name:
+ return ['section', section_category, section_status, section_name.group(1)]
+ return None
+
+
+class Result(object):
+
+ def __init__(self):
+ self.result_dict = {}
+
+ def store(self, section, test, status):
+ if not section in self.result_dict:
+ self.result_dict[section] = []
+
+ self.result_dict[section].append((test, status))
+
+ # sort tests by the test name(the first element of the tuple), for each section. This can be helpful when using git to diff for changes by making sure they are always in the same order.
+ def sort_tests(self):
+ for package in self.result_dict:
+ sorted_results = sorted(self.result_dict[package], key=lambda tup: tup[0])
+ self.result_dict[package] = sorted_results
+
+ # Log the results as files. The file name is the section name and the contents are the tests in that section.
+ def log_as_files(self, target_dir, test_status):
+ status_regex = re.compile('|'.join(map(str, test_status)))
+ if not type(test_status) == type([]):
+ raise Exception("test_status should be a list. Got " + str(test_status) + " instead.")
+ if not os.path.exists(target_dir):
+ raise Exception("Target directory does not exist: %s" % target_dir)
+
+ for section, test_results in self.result_dict.items():
+ prefix = ''
+ for x in test_status:
+ prefix +=x+'.'
+ if section:
+ prefix += section
+ section_file = os.path.join(target_dir, prefix)
+ # purge the file contents if it exists
+ open(section_file, 'w').close()
+ for test_result in test_results:
+ (test_name, status) = test_result
+ # we log only the tests with status in the test_status list
+ match_status = status_regex.search(status)
+ if match_status:
+ ftools.append_file(section_file, status + ": " + test_name)
+
+ # Not yet implemented!
+ def log_to_lava(self):
+ pass
diff --git a/poky/meta/lib/oeqa/utils/metadata.py b/poky/meta/lib/oeqa/utils/metadata.py
new file mode 100644
index 000000000..65bbdc61f
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/metadata.py
@@ -0,0 +1,108 @@
+# Copyright (C) 2016 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+#
+# Functions to get metadata from the testing host used
+# for analytics of test results.
+
+from collections import OrderedDict
+from collections.abc import MutableMapping
+from xml.dom.minidom import parseString
+from xml.etree.ElementTree import Element, tostring
+
+from oe.lsb import get_os_release
+from oeqa.utils.commands import runCmd, get_bb_vars
+
+
+def metadata_from_bb():
+ """ Returns test's metadata as OrderedDict.
+
+ Data will be gathered using bitbake -e thanks to get_bb_vars.
+ """
+ metadata_config_vars = ('MACHINE', 'BB_NUMBER_THREADS', 'PARALLEL_MAKE')
+
+ info_dict = OrderedDict()
+ hostname = runCmd('hostname')
+ info_dict['hostname'] = hostname.output
+ data_dict = get_bb_vars()
+
+ # Distro information
+ info_dict['distro'] = {'id': data_dict['DISTRO'],
+ 'version_id': data_dict['DISTRO_VERSION'],
+ 'pretty_name': '%s %s' % (data_dict['DISTRO'], data_dict['DISTRO_VERSION'])}
+
+ # Host distro information
+ os_release = get_os_release()
+ if os_release:
+ info_dict['host_distro'] = OrderedDict()
+ for key in ('ID', 'VERSION_ID', 'PRETTY_NAME'):
+ if key in os_release:
+ info_dict['host_distro'][key.lower()] = os_release[key]
+
+ info_dict['layers'] = get_layers(data_dict['BBLAYERS'])
+ info_dict['bitbake'] = git_rev_info(os.path.dirname(bb.__file__))
+
+ info_dict['config'] = OrderedDict()
+ for var in sorted(metadata_config_vars):
+ info_dict['config'][var] = data_dict[var]
+ return info_dict
+
+def metadata_from_data_store(d):
+ """ Returns test's metadata as OrderedDict.
+
+ Data will be collected from the provided data store.
+ """
+ # TODO: Getting metadata from the data store would
+ # be useful when running within bitbake.
+ pass
+
+def git_rev_info(path):
+ """Get git revision information as a dict"""
+ from git import Repo, InvalidGitRepositoryError, NoSuchPathError
+
+ info = OrderedDict()
+ try:
+ repo = Repo(path, search_parent_directories=True)
+ except (InvalidGitRepositoryError, NoSuchPathError):
+ return info
+ info['commit'] = repo.head.commit.hexsha
+ info['commit_count'] = repo.head.commit.count()
+ try:
+ info['branch'] = repo.active_branch.name
+ except TypeError:
+ info['branch'] = '(nobranch)'
+ return info
+
+def get_layers(layers):
+ """Returns layer information in dict format"""
+ layer_dict = OrderedDict()
+ for layer in layers.split():
+ layer_name = os.path.basename(layer)
+ layer_dict[layer_name] = git_rev_info(layer)
+ return layer_dict
+
+def write_metadata_file(file_path, metadata):
+ """ Writes metadata to a XML file in directory. """
+
+ xml = dict_to_XML('metadata', metadata)
+ xml_doc = parseString(tostring(xml).decode('UTF-8'))
+ with open(file_path, 'w') as f:
+ f.write(xml_doc.toprettyxml())
+
+def dict_to_XML(tag, dictionary, **kwargs):
+ """ Return XML element converting dicts recursively. """
+
+ elem = Element(tag, **kwargs)
+ for key, val in dictionary.items():
+ if tag == 'layers':
+ child = (dict_to_XML('layer', val, name=key))
+ elif isinstance(val, MutableMapping):
+ child = (dict_to_XML(key, val))
+ else:
+ if tag == 'config':
+ child = Element('variable', name=key)
+ else:
+ child = Element(key)
+ child.text = str(val)
+ elem.append(child)
+ return elem
diff --git a/poky/meta/lib/oeqa/utils/network.py b/poky/meta/lib/oeqa/utils/network.py
new file mode 100644
index 000000000..2768f6c5d
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/network.py
@@ -0,0 +1,8 @@
+import socket
+
+def get_free_port():
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(('', 0))
+ addr = s.getsockname()
+ s.close()
+ return addr[1]
diff --git a/poky/meta/lib/oeqa/utils/package_manager.py b/poky/meta/lib/oeqa/utils/package_manager.py
new file mode 100644
index 000000000..afd5b8e75
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/package_manager.py
@@ -0,0 +1,211 @@
+import os
+import json
+import shutil
+
+from oeqa.core.utils.test import getCaseFile, getCaseMethod
+
+def get_package_manager(d, root_path):
+ """
+ Returns an OE package manager that can install packages in root_path.
+ """
+ from oe.package_manager import RpmPM, OpkgPM, DpkgPM
+
+ pkg_class = d.getVar("IMAGE_PKGTYPE")
+ if pkg_class == "rpm":
+ pm = RpmPM(d,
+ root_path,
+ d.getVar('TARGET_VENDOR'),
+ filterbydependencies=False)
+ pm.create_configs()
+
+ elif pkg_class == "ipk":
+ pm = OpkgPM(d,
+ root_path,
+ d.getVar("IPKGCONF_TARGET"),
+ d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"))
+
+ elif pkg_class == "deb":
+ pm = DpkgPM(d,
+ root_path,
+ d.getVar('PACKAGE_ARCHS'),
+ d.getVar('DPKG_ARCH'))
+
+ pm.write_index()
+ pm.update()
+
+ return pm
+
+def find_packages_to_extract(test_suite):
+ """
+ Returns packages to extract required by runtime tests.
+ """
+ from oeqa.core.utils.test import getSuiteCasesFiles
+
+ needed_packages = {}
+ files = getSuiteCasesFiles(test_suite)
+
+ for f in set(files):
+ json_file = _get_json_file(f)
+ if json_file:
+ needed_packages.update(_get_needed_packages(json_file))
+
+ return needed_packages
+
+def _get_json_file(module_path):
+ """
+ Returns the path of the JSON file for a module, empty if doesn't exitst.
+ """
+
+ json_file = '%s.json' % module_path.rsplit('.', 1)[0]
+ if os.path.isfile(module_path) and os.path.isfile(json_file):
+ return json_file
+ else:
+ return ''
+
+def _get_needed_packages(json_file, test=None):
+ """
+ Returns a dict with needed packages based on a JSON file.
+
+ If a test is specified it will return the dict just for that test.
+ """
+ needed_packages = {}
+
+ with open(json_file) as f:
+ test_packages = json.load(f)
+ for key,value in test_packages.items():
+ needed_packages[key] = value
+
+ if test:
+ if test in needed_packages:
+ needed_packages = needed_packages[test]
+ else:
+ needed_packages = {}
+
+ return needed_packages
+
+def extract_packages(d, needed_packages):
+ """
+ Extract packages that will be needed during runtime.
+ """
+
+ import bb
+ import oe.path
+
+ extracted_path = d.getVar('TEST_EXTRACTED_DIR')
+
+ for key,value in needed_packages.items():
+ packages = ()
+ if isinstance(value, dict):
+ packages = (value, )
+ elif isinstance(value, list):
+ packages = value
+ else:
+ bb.fatal('Failed to process needed packages for %s; '
+ 'Value must be a dict or list' % key)
+
+ for package in packages:
+ pkg = package['pkg']
+ rm = package.get('rm', False)
+ extract = package.get('extract', True)
+
+ if extract:
+ #logger.debug(1, 'Extracting %s' % pkg)
+ dst_dir = os.path.join(extracted_path, pkg)
+ # Same package used for more than one test,
+ # don't need to extract again.
+ if os.path.exists(dst_dir):
+ continue
+
+ # Extract package and copy it to TEST_EXTRACTED_DIR
+ pkg_dir = _extract_in_tmpdir(d, pkg)
+ oe.path.copytree(pkg_dir, dst_dir)
+ shutil.rmtree(pkg_dir)
+
+ else:
+ #logger.debug(1, 'Copying %s' % pkg)
+ _copy_package(d, pkg)
+
+def _extract_in_tmpdir(d, pkg):
+ """"
+ Returns path to a temp directory where the package was
+ extracted without dependencies.
+ """
+
+ from oeqa.utils.package_manager import get_package_manager
+
+ pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
+ pm = get_package_manager(d, pkg_path)
+ extract_dir = pm.extract(pkg)
+ shutil.rmtree(pkg_path)
+
+ return extract_dir
+
+def _copy_package(d, pkg):
+ """
+ Copy the RPM, DEB or IPK package to dst_dir
+ """
+
+ from oeqa.utils.package_manager import get_package_manager
+
+ pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
+ dst_dir = d.getVar('TEST_PACKAGED_DIR')
+ pm = get_package_manager(d, pkg_path)
+ pkg_info = pm.package_info(pkg)
+ file_path = pkg_info[pkg]['filepath']
+ shutil.copy2(file_path, dst_dir)
+ shutil.rmtree(pkg_path)
+
+def install_package(test_case):
+ """
+ Installs package in DUT if required.
+ """
+ needed_packages = test_needs_package(test_case)
+ if needed_packages:
+ _install_uninstall_packages(needed_packages, test_case, True)
+
+def uninstall_package(test_case):
+ """
+ Uninstalls package in DUT if required.
+ """
+ needed_packages = test_needs_package(test_case)
+ if needed_packages:
+ _install_uninstall_packages(needed_packages, test_case, False)
+
+def test_needs_package(test_case):
+ """
+ Checks if a test case requires to install/uninstall packages.
+ """
+ test_file = getCaseFile(test_case)
+ json_file = _get_json_file(test_file)
+
+ if json_file:
+ test_method = getCaseMethod(test_case)
+ needed_packages = _get_needed_packages(json_file, test_method)
+ if needed_packages:
+ return needed_packages
+
+ return None
+
+def _install_uninstall_packages(needed_packages, test_case, install=True):
+ """
+ Install/Uninstall packages in the DUT without using a package manager
+ """
+
+ if isinstance(needed_packages, dict):
+ packages = [needed_packages]
+ elif isinstance(needed_packages, list):
+ packages = needed_packages
+
+ for package in packages:
+ pkg = package['pkg']
+ rm = package.get('rm', False)
+ extract = package.get('extract', True)
+ src_dir = os.path.join(test_case.tc.extract_dir, pkg)
+
+ # Install package
+ if install and extract:
+ test_case.tc.target.copyDirTo(src_dir, '/')
+
+ # Uninstall package
+ elif not install and rm:
+ test_case.tc.target.deleteDirStructure(src_dir, '/')
diff --git a/poky/meta/lib/oeqa/utils/qemurunner.py b/poky/meta/lib/oeqa/utils/qemurunner.py
new file mode 100644
index 000000000..c962602a6
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/qemurunner.py
@@ -0,0 +1,591 @@
+# Copyright (C) 2013 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# This module provides a class for starting qemu images using runqemu.
+# It's used by testimage.bbclass.
+
+import subprocess
+import os
+import sys
+import time
+import signal
+import re
+import socket
+import select
+import errno
+import string
+import threading
+import codecs
+import logging
+from oeqa.utils.dump import HostDumper
+
+# Get Unicode non printable control chars
+control_range = list(range(0,32))+list(range(127,160))
+control_chars = [chr(x) for x in control_range
+ if chr(x) not in string.printable]
+re_control_char = re.compile('[%s]' % re.escape("".join(control_chars)))
+
+class QemuRunner:
+
+ def __init__(self, machine, rootfs, display, tmpdir, deploy_dir_image, logfile, boottime, dump_dir, dump_host_cmds, use_kvm, logger):
+
+ # Popen object for runqemu
+ self.runqemu = None
+ # pid of the qemu process that runqemu will start
+ self.qemupid = None
+ # target ip - from the command line or runqemu output
+ self.ip = None
+ # host ip - where qemu is running
+ self.server_ip = None
+ # target ip netmask
+ self.netmask = None
+
+ self.machine = machine
+ self.rootfs = rootfs
+ self.display = display
+ self.tmpdir = tmpdir
+ self.deploy_dir_image = deploy_dir_image
+ self.logfile = logfile
+ self.boottime = boottime
+ self.logged = False
+ self.thread = None
+ self.use_kvm = use_kvm
+ self.msg = ''
+
+ self.runqemutime = 120
+ self.qemu_pidfile = 'pidfile_'+str(os.getpid())
+ self.host_dumper = HostDumper(dump_host_cmds, dump_dir)
+
+ self.logger = logger
+
+ def create_socket(self):
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ sock.bind(("127.0.0.1",0))
+ sock.listen(2)
+ port = sock.getsockname()[1]
+ self.logger.debug("Created listening socket for qemu serial console on: 127.0.0.1:%s" % port)
+ return (sock, port)
+
+ except socket.error:
+ sock.close()
+ raise
+
+ def log(self, msg):
+ if self.logfile:
+ # It is needed to sanitize the data received from qemu
+ # because is possible to have control characters
+ msg = msg.decode("utf-8", errors='ignore')
+ msg = re_control_char.sub('', msg)
+ self.msg += msg
+ with codecs.open(self.logfile, "a", encoding="utf-8") as f:
+ f.write("%s" % msg)
+
+ def getOutput(self, o):
+ import fcntl
+ fl = fcntl.fcntl(o, fcntl.F_GETFL)
+ fcntl.fcntl(o, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+ return os.read(o.fileno(), 1000000).decode("utf-8")
+
+
+ def handleSIGCHLD(self, signum, frame):
+ if self.runqemu and self.runqemu.poll():
+ if self.runqemu.returncode:
+ self.logger.debug('runqemu exited with code %d' % self.runqemu.returncode)
+ self.logger.debug("Output from runqemu:\n%s" % self.getOutput(self.runqemu.stdout))
+ self.stop()
+ self._dump_host()
+ raise SystemExit
+
+ def start(self, qemuparams = None, get_ip = True, extra_bootparams = None, runqemuparams='', launch_cmd=None, discard_writes=True):
+ env = os.environ.copy()
+ if self.display:
+ env["DISPLAY"] = self.display
+ # Set this flag so that Qemu doesn't do any grabs as SDL grabs
+ # interact badly with screensavers.
+ env["QEMU_DONT_GRAB"] = "1"
+ if not os.path.exists(self.rootfs):
+ self.logger.error("Invalid rootfs %s" % self.rootfs)
+ return False
+ if not os.path.exists(self.tmpdir):
+ self.logger.error("Invalid TMPDIR path %s" % self.tmpdir)
+ return False
+ else:
+ env["OE_TMPDIR"] = self.tmpdir
+ if not os.path.exists(self.deploy_dir_image):
+ self.logger.error("Invalid DEPLOY_DIR_IMAGE path %s" % self.deploy_dir_image)
+ return False
+ else:
+ env["DEPLOY_DIR_IMAGE"] = self.deploy_dir_image
+
+ if not launch_cmd:
+ launch_cmd = 'runqemu %s %s ' % ('snapshot' if discard_writes else '', runqemuparams)
+ if self.use_kvm:
+ self.logger.debug('Using kvm for runqemu')
+ launch_cmd += ' kvm'
+ else:
+ self.logger.debug('Not using kvm for runqemu')
+ if not self.display:
+ launch_cmd += ' nographic'
+ launch_cmd += ' %s %s' % (self.machine, self.rootfs)
+
+ return self.launch(launch_cmd, qemuparams=qemuparams, get_ip=get_ip, extra_bootparams=extra_bootparams, env=env)
+
+ def launch(self, launch_cmd, get_ip = True, qemuparams = None, extra_bootparams = None, env = None):
+ try:
+ threadsock, threadport = self.create_socket()
+ self.server_socket, self.serverport = self.create_socket()
+ except socket.error as msg:
+ self.logger.error("Failed to create listening socket: %s" % msg[1])
+ return False
+
+ bootparams = 'console=tty1 console=ttyS0,115200n8 printk.time=1'
+ if extra_bootparams:
+ bootparams = bootparams + ' ' + extra_bootparams
+
+ # Ask QEMU to store the QEMU process PID in file, this way we don't have to parse running processes
+ # and analyze descendents in order to determine it.
+ if os.path.exists(self.qemu_pidfile):
+ os.remove(self.qemu_pidfile)
+ self.qemuparams = 'bootparams="{0}" qemuparams="-serial tcp:127.0.0.1:{1} -pidfile {2}"'.format(bootparams, threadport, self.qemu_pidfile)
+ if qemuparams:
+ self.qemuparams = self.qemuparams[:-1] + " " + qemuparams + " " + '\"'
+
+ launch_cmd += ' tcpserial=%s %s' % (self.serverport, self.qemuparams)
+
+ self.origchldhandler = signal.getsignal(signal.SIGCHLD)
+ signal.signal(signal.SIGCHLD, self.handleSIGCHLD)
+
+ self.logger.debug('launchcmd=%s'%(launch_cmd))
+
+ # FIXME: We pass in stdin=subprocess.PIPE here to work around stty
+ # blocking at the end of the runqemu script when using this within
+ # oe-selftest (this makes stty error out immediately). There ought
+ # to be a proper fix but this will suffice for now.
+ self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env)
+ output = self.runqemu.stdout
+
+ #
+ # We need the preexec_fn above so that all runqemu processes can easily be killed
+ # (by killing their process group). This presents a problem if this controlling
+ # process itself is killed however since those processes don't notice the death
+ # of the parent and merrily continue on.
+ #
+ # Rather than hack runqemu to deal with this, we add something here instead.
+ # Basically we fork off another process which holds an open pipe to the parent
+ # and also is setpgrp. If/when the pipe sees EOF from the parent dieing, it kills
+ # the process group. This is like pctrl's PDEATHSIG but for a process group
+ # rather than a single process.
+ #
+ r, w = os.pipe()
+ self.monitorpid = os.fork()
+ if self.monitorpid:
+ os.close(r)
+ self.monitorpipe = os.fdopen(w, "w")
+ else:
+ # child process
+ os.setpgrp()
+ os.close(w)
+ r = os.fdopen(r)
+ x = r.read()
+ os.killpg(os.getpgid(self.runqemu.pid), signal.SIGTERM)
+ sys.exit(0)
+
+ self.logger.debug("runqemu started, pid is %s" % self.runqemu.pid)
+ self.logger.debug("waiting at most %s seconds for qemu pid (%s)" %
+ (self.runqemutime, time.strftime("%D %H:%M:%S")))
+ endtime = time.time() + self.runqemutime
+ while not self.is_alive() and time.time() < endtime:
+ if self.runqemu.poll():
+ if self.runqemu.returncode:
+ # No point waiting any longer
+ self.logger.debug('runqemu exited with code %d' % self.runqemu.returncode)
+ self._dump_host()
+ self.stop()
+ self.logger.debug("Output from runqemu:\n%s" % self.getOutput(output))
+ return False
+ time.sleep(0.5)
+
+ if not self.is_alive():
+ self.logger.error("Qemu pid didn't appear in %s seconds (%s)" %
+ (self.runqemutime, time.strftime("%D %H:%M:%S")))
+ # Dump all processes to help us to figure out what is going on...
+ ps = subprocess.Popen(['ps', 'axww', '-o', 'pid,ppid,command '], stdout=subprocess.PIPE).communicate()[0]
+ processes = ps.decode("utf-8")
+ self.logger.debug("Running processes:\n%s" % processes)
+ self._dump_host()
+ self.stop()
+ op = self.getOutput(output)
+ if op:
+ self.logger.error("Output from runqemu:\n%s" % op)
+ else:
+ self.logger.error("No output from runqemu.\n")
+ return False
+
+ # We are alive: qemu is running
+ out = self.getOutput(output)
+ netconf = False # network configuration is not required by default
+ self.logger.debug("qemu started in %s seconds - qemu procces pid is %s (%s)" %
+ (time.time() - (endtime - self.runqemutime),
+ self.qemupid, time.strftime("%D %H:%M:%S")))
+ if get_ip:
+ cmdline = ''
+ with open('/proc/%s/cmdline' % self.qemupid) as p:
+ cmdline = p.read()
+ # It is needed to sanitize the data received
+ # because is possible to have control characters
+ cmdline = re_control_char.sub(' ', cmdline)
+ try:
+ ips = re.findall("((?:[0-9]{1,3}\.){3}[0-9]{1,3})", cmdline.split("ip=")[1])
+ self.ip = ips[0]
+ self.server_ip = ips[1]
+ self.logger.debug("qemu cmdline used:\n{}".format(cmdline))
+ except (IndexError, ValueError):
+ # Try to get network configuration from runqemu output
+ match = re.match('.*Network configuration: ([0-9.]+)::([0-9.]+):([0-9.]+)$.*',
+ out, re.MULTILINE|re.DOTALL)
+ if match:
+ self.ip, self.server_ip, self.netmask = match.groups()
+ # network configuration is required as we couldn't get it
+ # from the runqemu command line, so qemu doesn't run kernel
+ # and guest networking is not configured
+ netconf = True
+ else:
+ self.logger.error("Couldn't get ip from qemu command line and runqemu output! "
+ "Here is the qemu command line used:\n%s\n"
+ "and output from runqemu:\n%s" % (cmdline, out))
+ self._dump_host()
+ self.stop()
+ return False
+
+ self.logger.debug("Target IP: %s" % self.ip)
+ self.logger.debug("Server IP: %s" % self.server_ip)
+
+ self.thread = LoggingThread(self.log, threadsock, self.logger)
+ self.thread.start()
+ if not self.thread.connection_established.wait(self.boottime):
+ self.logger.error("Didn't receive a console connection from qemu. "
+ "Here is the qemu command line used:\n%s\nand "
+ "output from runqemu:\n%s" % (cmdline, out))
+ self.stop_thread()
+ return False
+
+ self.logger.debug("Output from runqemu:\n%s", out)
+ self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
+ (self.boottime, time.strftime("%D %H:%M:%S")))
+ endtime = time.time() + self.boottime
+ socklist = [self.server_socket]
+ reachedlogin = False
+ stopread = False
+ qemusock = None
+ bootlog = b''
+ data = b''
+ while time.time() < endtime and not stopread:
+ try:
+ sread, swrite, serror = select.select(socklist, [], [], 5)
+ except InterruptedError:
+ continue
+ for sock in sread:
+ if sock is self.server_socket:
+ qemusock, addr = self.server_socket.accept()
+ qemusock.setblocking(0)
+ socklist.append(qemusock)
+ socklist.remove(self.server_socket)
+ self.logger.debug("Connection from %s:%s" % addr)
+ else:
+ data = data + sock.recv(1024)
+ if data:
+ bootlog += data
+ data = b''
+ if b' login:' in bootlog:
+ self.server_socket = qemusock
+ stopread = True
+ reachedlogin = True
+ self.logger.debug("Reached login banner in %s seconds (%s)" %
+ (time.time() - (endtime - self.boottime),
+ time.strftime("%D %H:%M:%S")))
+ else:
+ # no need to check if reachedlogin unless we support multiple connections
+ self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
+ time.strftime("%D %H:%M:%S"))
+ socklist.remove(sock)
+ sock.close()
+ stopread = True
+
+
+ if not reachedlogin:
+ if time.time() >= endtime:
+ self.logger.debug("Target didn't reach login banner in %d seconds (%s)" %
+ (self.boottime, time.strftime("%D %H:%M:%S")))
+ tail = lambda l: "\n".join(l.splitlines()[-25:])
+ # in case bootlog is empty, use tail qemu log store at self.msg
+ lines = tail(bootlog if bootlog else self.msg)
+ self.logger.debug("Last 25 lines of text:\n%s" % lines)
+ self.logger.debug("Check full boot log: %s" % self.logfile)
+ self._dump_host()
+ self.stop()
+ return False
+
+ # If we are not able to login the tests can continue
+ try:
+ (status, output) = self.run_serial("root\n", raw=True)
+ if re.search("root@[a-zA-Z0-9\-]+:~#", output):
+ self.logged = True
+ self.logger.debug("Logged as root in serial console")
+ if netconf:
+ # configure guest networking
+ cmd = "ifconfig eth0 %s netmask %s up\n" % (self.ip, self.netmask)
+ output = self.run_serial(cmd, raw=True)[1]
+ if re.search("root@[a-zA-Z0-9\-]+:~#", output):
+ self.logger.debug("configured ip address %s", self.ip)
+ else:
+ self.logger.debug("Couldn't configure guest networking")
+ else:
+ self.logger.debug("Couldn't login into serial console"
+ " as root using blank password")
+ except:
+ self.logger.debug("Serial console failed while trying to login")
+ return True
+
+ def stop(self):
+ self.stop_thread()
+ self.stop_qemu_system()
+ if hasattr(self, "origchldhandler"):
+ signal.signal(signal.SIGCHLD, self.origchldhandler)
+ if self.runqemu:
+ if hasattr(self, "monitorpid"):
+ os.kill(self.monitorpid, signal.SIGKILL)
+ self.logger.debug("Sending SIGTERM to runqemu")
+ try:
+ os.killpg(os.getpgid(self.runqemu.pid), signal.SIGTERM)
+ except OSError as e:
+ if e.errno != errno.ESRCH:
+ raise
+ endtime = time.time() + self.runqemutime
+ while self.runqemu.poll() is None and time.time() < endtime:
+ time.sleep(1)
+ if self.runqemu.poll() is None:
+ self.logger.debug("Sending SIGKILL to runqemu")
+ os.killpg(os.getpgid(self.runqemu.pid), signal.SIGKILL)
+ self.runqemu = None
+ if hasattr(self, 'server_socket') and self.server_socket:
+ self.server_socket.close()
+ self.server_socket = None
+ self.qemupid = None
+ self.ip = None
+ if os.path.exists(self.qemu_pidfile):
+ os.remove(self.qemu_pidfile)
+
+ def stop_qemu_system(self):
+ if self.qemupid:
+ try:
+ # qemu-system behaves well and a SIGTERM is enough
+ os.kill(self.qemupid, signal.SIGTERM)
+ except ProcessLookupError as e:
+ self.logger.warn('qemu-system ended unexpectedly')
+
+ def stop_thread(self):
+ if self.thread and self.thread.is_alive():
+ self.thread.stop()
+ self.thread.join()
+
+ def restart(self, qemuparams = None):
+ self.logger.debug("Restarting qemu process")
+ if self.runqemu.poll() is None:
+ self.stop()
+ if self.start(qemuparams):
+ return True
+ return False
+
+ def is_alive(self):
+ if not self.runqemu:
+ return False
+ if os.path.isfile(self.qemu_pidfile):
+ f = open(self.qemu_pidfile, 'r')
+ qemu_pid = f.read()
+ f.close()
+ qemupid = int(qemu_pid)
+ if os.path.exists("/proc/" + str(qemupid)):
+ self.qemupid = qemupid
+ return True
+ return False
+
+ def run_serial(self, command, raw=False, timeout=5):
+ # We assume target system have echo to get command status
+ if not raw:
+ command = "%s; echo $?\n" % command
+
+ data = ''
+ status = 0
+ self.server_socket.sendall(command.encode('utf-8'))
+ start = time.time()
+ end = start + timeout
+ while True:
+ now = time.time()
+ if now >= end:
+ data += "<<< run_serial(): command timed out after %d seconds without output >>>\r\n\r\n" % timeout
+ break
+ try:
+ sread, _, _ = select.select([self.server_socket],[],[], end - now)
+ except InterruptedError:
+ continue
+ if sread:
+ answer = self.server_socket.recv(1024)
+ if answer:
+ data += answer.decode('utf-8')
+ # Search the prompt to stop
+ if re.search("[a-zA-Z0-9]+@[a-zA-Z0-9\-]+:~#", data):
+ break
+ else:
+ raise Exception("No data on serial console socket")
+
+ if data:
+ if raw:
+ status = 1
+ else:
+ # Remove first line (command line) and last line (prompt)
+ data = data[data.find('$?\r\n')+4:data.rfind('\r\n')]
+ index = data.rfind('\r\n')
+ if index == -1:
+ status_cmd = data
+ data = ""
+ else:
+ status_cmd = data[index+2:]
+ data = data[:index]
+ if (status_cmd == "0"):
+ status = 1
+ return (status, str(data))
+
+
+ def _dump_host(self):
+ self.host_dumper.create_dir("qemu")
+ self.logger.warn("Qemu ended unexpectedly, dump data from host"
+ " is in %s" % self.host_dumper.dump_dir)
+ self.host_dumper.dump_host()
+
+# This class is for reading data from a socket and passing it to logfunc
+# to be processed. It's completely event driven and has a straightforward
+# event loop. The mechanism for stopping the thread is a simple pipe which
+# will wake up the poll and allow for tearing everything down.
+class LoggingThread(threading.Thread):
+ def __init__(self, logfunc, sock, logger):
+ self.connection_established = threading.Event()
+ self.serversock = sock
+ self.logfunc = logfunc
+ self.logger = logger
+ self.readsock = None
+ self.running = False
+
+ self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
+ self.readevents = select.POLLIN | select.POLLPRI
+
+ threading.Thread.__init__(self, target=self.threadtarget)
+
+ def threadtarget(self):
+ try:
+ self.eventloop()
+ finally:
+ self.teardown()
+
+ def run(self):
+ self.logger.debug("Starting logging thread")
+ self.readpipe, self.writepipe = os.pipe()
+ threading.Thread.run(self)
+
+ def stop(self):
+ self.logger.debug("Stopping logging thread")
+ if self.running:
+ os.write(self.writepipe, bytes("stop", "utf-8"))
+
+ def teardown(self):
+ self.logger.debug("Tearing down logging thread")
+ self.close_socket(self.serversock)
+
+ if self.readsock is not None:
+ self.close_socket(self.readsock)
+
+ self.close_ignore_error(self.readpipe)
+ self.close_ignore_error(self.writepipe)
+ self.running = False
+
+ def eventloop(self):
+ poll = select.poll()
+ event_read_mask = self.errorevents | self.readevents
+ poll.register(self.serversock.fileno())
+ poll.register(self.readpipe, event_read_mask)
+
+ breakout = False
+ self.running = True
+ self.logger.debug("Starting thread event loop")
+ while not breakout:
+ events = poll.poll()
+ for event in events:
+ # An error occurred, bail out
+ if event[1] & self.errorevents:
+ raise Exception(self.stringify_event(event[1]))
+
+ # Event to stop the thread
+ if self.readpipe == event[0]:
+ self.logger.debug("Stop event received")
+ breakout = True
+ break
+
+ # A connection request was received
+ elif self.serversock.fileno() == event[0]:
+ self.logger.debug("Connection request received")
+ self.readsock, _ = self.serversock.accept()
+ self.readsock.setblocking(0)
+ poll.unregister(self.serversock.fileno())
+ poll.register(self.readsock.fileno(), event_read_mask)
+
+ self.logger.debug("Setting connection established event")
+ self.connection_established.set()
+
+ # Actual data to be logged
+ elif self.readsock.fileno() == event[0]:
+ data = self.recv(1024)
+ self.logfunc(data)
+
+ # Since the socket is non-blocking make sure to honor EAGAIN
+ # and EWOULDBLOCK.
+ def recv(self, count):
+ try:
+ data = self.readsock.recv(count)
+ except socket.error as e:
+ if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
+ return ''
+ else:
+ raise
+
+ if data is None:
+ raise Exception("No data on read ready socket")
+ elif not data:
+ # This actually means an orderly shutdown
+ # happened. But for this code it counts as an
+ # error since the connection shouldn't go away
+ # until qemu exits.
+ raise Exception("Console connection closed unexpectedly")
+
+ return data
+
+ def stringify_event(self, event):
+ val = ''
+ if select.POLLERR == event:
+ val = 'POLLER'
+ elif select.POLLHUP == event:
+ val = 'POLLHUP'
+ elif select.POLLNVAL == event:
+ val = 'POLLNVAL'
+ return val
+
+ def close_socket(self, sock):
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+
+ def close_ignore_error(self, fd):
+ try:
+ os.close(fd)
+ except OSError:
+ pass
diff --git a/poky/meta/lib/oeqa/utils/qemutinyrunner.py b/poky/meta/lib/oeqa/utils/qemutinyrunner.py
new file mode 100644
index 000000000..63b5d1648
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/qemutinyrunner.py
@@ -0,0 +1,176 @@
+# Copyright (C) 2015 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# This module provides a class for starting qemu images of poky tiny.
+# It's used by testimage.bbclass.
+
+import subprocess
+import os
+import time
+import signal
+import re
+import socket
+import select
+import bb
+from .qemurunner import QemuRunner
+
+class QemuTinyRunner(QemuRunner):
+
+ def __init__(self, machine, rootfs, display, tmpdir, deploy_dir_image, logfile, kernel, boottime, logger):
+
+ # Popen object for runqemu
+ self.runqemu = None
+ # pid of the qemu process that runqemu will start
+ self.qemupid = None
+ # target ip - from the command line
+ self.ip = None
+ # host ip - where qemu is running
+ self.server_ip = None
+
+ self.machine = machine
+ self.rootfs = rootfs
+ self.display = display
+ self.tmpdir = tmpdir
+ self.deploy_dir_image = deploy_dir_image
+ self.logfile = logfile
+ self.boottime = boottime
+
+ self.runqemutime = 60
+ self.socketfile = "console.sock"
+ self.server_socket = None
+ self.kernel = kernel
+ self.logger = logger
+
+
+ def create_socket(self):
+ tries = 3
+ while tries > 0:
+ try:
+ self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.server_socket.connect(self.socketfile)
+ bb.note("Created listening socket for qemu serial console.")
+ tries = 0
+ except socket.error as msg:
+ self.server_socket.close()
+ bb.fatal("Failed to create listening socket.")
+ tries -= 1
+
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s" % msg)
+
+ def start(self, qemuparams = None, ssh=True, extra_bootparams=None, runqemuparams='', discard_writes=True):
+
+ if self.display:
+ os.environ["DISPLAY"] = self.display
+ else:
+ bb.error("To start qemu I need a X desktop, please set DISPLAY correctly (e.g. DISPLAY=:1)")
+ return False
+ if not os.path.exists(self.rootfs):
+ bb.error("Invalid rootfs %s" % self.rootfs)
+ return False
+ if not os.path.exists(self.tmpdir):
+ bb.error("Invalid TMPDIR path %s" % self.tmpdir)
+ return False
+ else:
+ os.environ["OE_TMPDIR"] = self.tmpdir
+ if not os.path.exists(self.deploy_dir_image):
+ bb.error("Invalid DEPLOY_DIR_IMAGE path %s" % self.deploy_dir_image)
+ return False
+ else:
+ os.environ["DEPLOY_DIR_IMAGE"] = self.deploy_dir_image
+
+ # Set this flag so that Qemu doesn't do any grabs as SDL grabs interact
+ # badly with screensavers.
+ os.environ["QEMU_DONT_GRAB"] = "1"
+ self.qemuparams = '--append "root=/dev/ram0 console=ttyS0" -nographic -serial unix:%s,server,nowait' % self.socketfile
+
+ launch_cmd = 'qemu-system-i386 -kernel %s -initrd %s %s' % (self.kernel, self.rootfs, self.qemuparams)
+ self.runqemu = subprocess.Popen(launch_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,preexec_fn=os.setpgrp)
+
+ bb.note("runqemu started, pid is %s" % self.runqemu.pid)
+ bb.note("waiting at most %s seconds for qemu pid" % self.runqemutime)
+ endtime = time.time() + self.runqemutime
+ while not self.is_alive() and time.time() < endtime:
+ time.sleep(1)
+
+ if self.is_alive():
+ bb.note("qemu started - qemu procces pid is %s" % self.qemupid)
+ self.create_socket()
+ else:
+ bb.note("Qemu pid didn't appeared in %s seconds" % self.runqemutime)
+ output = self.runqemu.stdout
+ self.stop()
+ bb.note("Output from runqemu:\n%s" % output.read().decode("utf-8"))
+ return False
+
+ return self.is_alive()
+
+ def run_serial(self, command, timeout=5):
+ self.server_socket.sendall(command+'\n')
+ data = ''
+ status = 0
+ stopread = False
+ endtime = time.time()+timeout
+ while time.time()<endtime and not stopread:
+ try:
+ sread, _, _ = select.select([self.server_socket],[],[],1)
+ except InterruptedError:
+ continue
+ for sock in sread:
+ answer = sock.recv(1024)
+ if answer:
+ data += answer
+ else:
+ sock.close()
+ stopread = True
+ if not data:
+ status = 1
+ if not stopread:
+ data += "<<< run_serial(): command timed out after %d seconds without output >>>\r\n\r\n" % timeout
+ return (status, str(data))
+
+ def find_child(self,parent_pid):
+ #
+ # Walk the process tree from the process specified looking for a qemu-system. Return its [pid'cmd]
+ #
+ ps = subprocess.Popen(['ps', 'axww', '-o', 'pid,ppid,command'], stdout=subprocess.PIPE).communicate()[0]
+ processes = ps.decode("utf-8").split('\n')
+ nfields = len(processes[0].split()) - 1
+ pids = {}
+ commands = {}
+ for row in processes[1:]:
+ data = row.split(None, nfields)
+ if len(data) != 3:
+ continue
+ if data[1] not in pids:
+ pids[data[1]] = []
+
+ pids[data[1]].append(data[0])
+ commands[data[0]] = data[2]
+
+ if parent_pid not in pids:
+ return []
+
+ parents = []
+ newparents = pids[parent_pid]
+ while newparents:
+ next = []
+ for p in newparents:
+ if p in pids:
+ for n in pids[p]:
+ if n not in parents and n not in next:
+ next.append(n)
+ if p not in parents:
+ parents.append(p)
+ newparents = next
+ #print("Children matching %s:" % str(parents))
+ for p in parents:
+ # Need to be careful here since runqemu runs "ldd qemu-system-xxxx"
+ # Also, old versions of ldd (2.11) run "LD_XXXX qemu-system-xxxx"
+ basecmd = commands[p].split()[0]
+ basecmd = os.path.basename(basecmd)
+ if "qemu-system" in basecmd and "-serial unix" in commands[p]:
+ return [int(p),commands[p]]
diff --git a/poky/meta/lib/oeqa/utils/sshcontrol.py b/poky/meta/lib/oeqa/utils/sshcontrol.py
new file mode 100644
index 000000000..d292893c0
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/sshcontrol.py
@@ -0,0 +1,242 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# Provides a class for setting up ssh connections,
+# running commands and copying files to/from a target.
+# It's used by testimage.bbclass and tests in lib/oeqa/runtime.
+
+import subprocess
+import time
+import os
+import select
+
+
+class SSHProcess(object):
+ def __init__(self, **options):
+
+ self.defaultopts = {
+ "stdout": subprocess.PIPE,
+ "stderr": subprocess.STDOUT,
+ "stdin": None,
+ "shell": False,
+ "bufsize": -1,
+ "preexec_fn": os.setsid,
+ }
+ self.options = dict(self.defaultopts)
+ self.options.update(options)
+ self.status = None
+ self.output = None
+ self.process = None
+ self.starttime = None
+ self.logfile = None
+
+ # Unset DISPLAY which means we won't trigger SSH_ASKPASS
+ env = os.environ.copy()
+ if "DISPLAY" in env:
+ del env['DISPLAY']
+ self.options['env'] = env
+
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s" % msg)
+
+ def _run(self, command, timeout=None, logfile=None):
+ self.logfile = logfile
+ self.starttime = time.time()
+ output = ''
+ self.process = subprocess.Popen(command, **self.options)
+ if timeout:
+ endtime = self.starttime + timeout
+ eof = False
+ while time.time() < endtime and not eof:
+ try:
+ if select.select([self.process.stdout], [], [], 5)[0] != []:
+ data = os.read(self.process.stdout.fileno(), 1024)
+ if not data:
+ self.process.stdout.close()
+ eof = True
+ else:
+ data = data.decode("utf-8")
+ output += data
+ self.log(data)
+ endtime = time.time() + timeout
+ except InterruptedError:
+ continue
+
+ # process hasn't returned yet
+ if not eof:
+ self.process.terminate()
+ time.sleep(5)
+ try:
+ self.process.kill()
+ except OSError:
+ pass
+ lastline = "\nProcess killed - no output for %d seconds. Total running time: %d seconds." % (timeout, time.time() - self.starttime)
+ self.log(lastline)
+ output += lastline
+ else:
+ output = self.process.communicate()[0]
+ self.log(output.rstrip())
+
+ self.status = self.process.wait()
+ self.output = output.rstrip()
+
+ def run(self, command, timeout=None, logfile=None):
+ try:
+ self._run(command, timeout, logfile)
+ except:
+ # Need to guard against a SystemExit or other exception occuring whilst running
+ # and ensure we don't leave a process behind.
+ if self.process.poll() is None:
+ self.process.kill()
+ self.status = self.process.wait()
+ raise
+ return (self.status, self.output)
+
+class SSHControl(object):
+ def __init__(self, ip, logfile=None, timeout=300, user='root', port=None):
+ self.ip = ip
+ self.defaulttimeout = timeout
+ self.ignore_status = True
+ self.logfile = logfile
+ self.user = user
+ self.ssh_options = [
+ '-o', 'UserKnownHostsFile=/dev/null',
+ '-o', 'StrictHostKeyChecking=no',
+ '-o', 'LogLevel=ERROR'
+ ]
+ self.ssh = ['ssh', '-l', self.user ] + self.ssh_options
+ self.scp = ['scp'] + self.ssh_options
+ if port:
+ self.ssh = self.ssh + [ '-p', port ]
+ self.scp = self.scp + [ '-P', port ]
+
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s\n" % msg)
+
+ def _internal_run(self, command, timeout=None, ignore_status = True):
+ self.log("[Running]$ %s" % " ".join(command))
+
+ proc = SSHProcess()
+ status, output = proc.run(command, timeout, logfile=self.logfile)
+
+ self.log("[Command returned '%d' after %.2f seconds]" % (status, time.time() - proc.starttime))
+
+ if status and not ignore_status:
+ raise AssertionError("Command '%s' returned non-zero exit status %d:\n%s" % (command, status, output))
+
+ return (status, output)
+
+ def run(self, command, timeout=None):
+ """
+ command - ssh command to run
+ timeout=<val> - kill command if there is no output after <val> seconds
+ timeout=None - kill command if there is no output after a default value seconds
+ timeout=0 - no timeout, let command run until it returns
+ """
+
+ command = self.ssh + [self.ip, 'export PATH=/usr/sbin:/sbin:/usr/bin:/bin; ' + command]
+
+ if timeout is None:
+ return self._internal_run(command, self.defaulttimeout, self.ignore_status)
+ if timeout == 0:
+ return self._internal_run(command, None, self.ignore_status)
+ return self._internal_run(command, timeout, self.ignore_status)
+
+ def copy_to(self, localpath, remotepath):
+ if os.path.islink(localpath):
+ localpath = os.path.dirname(localpath) + "/" + os.readlink(localpath)
+ command = self.scp + [localpath, '%s@%s:%s' % (self.user, self.ip, remotepath)]
+ return self._internal_run(command, ignore_status=False)
+
+ def copy_from(self, remotepath, localpath):
+ command = self.scp + ['%s@%s:%s' % (self.user, self.ip, remotepath), localpath]
+ return self._internal_run(command, ignore_status=False)
+
+ def copy_dir_to(self, localpath, remotepath):
+ """
+ Copy recursively localpath directory to remotepath in target.
+ """
+
+ for root, dirs, files in os.walk(localpath):
+ # Create directories in the target as needed
+ for d in dirs:
+ tmp_dir = os.path.join(root, d).replace(localpath, "")
+ new_dir = os.path.join(remotepath, tmp_dir.lstrip("/"))
+ cmd = "mkdir -p %s" % new_dir
+ self.run(cmd)
+
+ # Copy files into the target
+ for f in files:
+ tmp_file = os.path.join(root, f).replace(localpath, "")
+ dst_file = os.path.join(remotepath, tmp_file.lstrip("/"))
+ src_file = os.path.join(root, f)
+ self.copy_to(src_file, dst_file)
+
+
+ def delete_files(self, remotepath, files):
+ """
+ Delete files in target's remote path.
+ """
+
+ cmd = "rm"
+ if not isinstance(files, list):
+ files = [files]
+
+ for f in files:
+ cmd = "%s %s" % (cmd, os.path.join(remotepath, f))
+
+ self.run(cmd)
+
+
+ def delete_dir(self, remotepath):
+ """
+ Delete remotepath directory in target.
+ """
+
+ cmd = "rmdir %s" % remotepath
+ self.run(cmd)
+
+
+ def delete_dir_structure(self, localpath, remotepath):
+ """
+ Delete recursively localpath structure directory in target's remotepath.
+
+ This function is very usefult to delete a package that is installed in
+ the DUT and the host running the test has such package extracted in tmp
+ directory.
+
+ Example:
+ pwd: /home/user/tmp
+ tree: .
+ └── work
+ ├── dir1
+ │   └── file1
+ └── dir2
+
+ localpath = "/home/user/tmp" and remotepath = "/home/user"
+
+ With the above variables this function will try to delete the
+ directory in the DUT in this order:
+ /home/user/work/dir1/file1
+ /home/user/work/dir1 (if dir is empty)
+ /home/user/work/dir2 (if dir is empty)
+ /home/user/work (if dir is empty)
+ """
+
+ for root, dirs, files in os.walk(localpath, topdown=False):
+ # Delete files first
+ tmpdir = os.path.join(root).replace(localpath, "")
+ remotedir = os.path.join(remotepath, tmpdir.lstrip("/"))
+ self.delete_files(remotedir, files)
+
+ # Remove dirs if empty
+ for d in dirs:
+ tmpdir = os.path.join(root, d).replace(localpath, "")
+ remotedir = os.path.join(remotepath, tmpdir.lstrip("/"))
+ self.delete_dir(remotepath)
diff --git a/poky/meta/lib/oeqa/utils/subprocesstweak.py b/poky/meta/lib/oeqa/utils/subprocesstweak.py
new file mode 100644
index 000000000..1f7d11b55
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/subprocesstweak.py
@@ -0,0 +1,19 @@
+import subprocess
+
+class OETestCalledProcessError(subprocess.CalledProcessError):
+ def __str__(self):
+ def strify(o):
+ if isinstance(o, bytes):
+ return o.decode("utf-8", errors="replace")
+ else:
+ return o
+
+ s = "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
+ if hasattr(self, "output") and self.output:
+ s = s + "\nStandard Output: " + strify(self.output)
+ if hasattr(self, "stderr") and self.stderr:
+ s = s + "\nStandard Error: " + strify(self.stderr)
+ return s
+
+def errors_have_output():
+ subprocess.CalledProcessError = OETestCalledProcessError
diff --git a/poky/meta/lib/oeqa/utils/targetbuild.py b/poky/meta/lib/oeqa/utils/targetbuild.py
new file mode 100644
index 000000000..1202d579f
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/targetbuild.py
@@ -0,0 +1,139 @@
+# Copyright (C) 2013 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# Provides a class for automating build tests for projects
+
+import os
+import re
+import bb.utils
+import subprocess
+import tempfile
+from abc import ABCMeta, abstractmethod
+
+class BuildProject(metaclass=ABCMeta):
+
+ def __init__(self, d, uri, foldername=None, tmpdir=None):
+ self.d = d
+ self.uri = uri
+ self.archive = os.path.basename(uri)
+ if not tmpdir:
+ tmpdir = self.d.getVar('WORKDIR')
+ if not tmpdir:
+ tmpdir = tempfile.mkdtemp(prefix='buildproject')
+ self.localarchive = os.path.join(tmpdir,self.archive)
+ if foldername:
+ self.fname = foldername
+ else:
+ self.fname = re.sub(r'\.tar\.bz2$|\.tar\.gz$|\.tar\.xz$', '', self.archive)
+
+ # Download self.archive to self.localarchive
+ def _download_archive(self):
+ dl_dir = self.d.getVar("DL_DIR")
+ if dl_dir and os.path.exists(os.path.join(dl_dir, self.archive)):
+ bb.utils.copyfile(os.path.join(dl_dir, self.archive), self.localarchive)
+ return
+
+ exportvars = ['HTTP_PROXY', 'http_proxy',
+ 'HTTPS_PROXY', 'https_proxy',
+ 'FTP_PROXY', 'ftp_proxy',
+ 'FTPS_PROXY', 'ftps_proxy',
+ 'NO_PROXY', 'no_proxy',
+ 'ALL_PROXY', 'all_proxy',
+ 'SOCKS5_USER', 'SOCKS5_PASSWD']
+
+ cmd = ''
+ for var in exportvars:
+ val = self.d.getVar(var)
+ if val:
+ cmd = 'export ' + var + '=\"%s\"; %s' % (val, cmd)
+
+ cmd = cmd + "wget -O %s %s" % (self.localarchive, self.uri)
+ subprocess.check_output(cmd, shell=True)
+
+ # This method should provide a way to run a command in the desired environment.
+ @abstractmethod
+ def _run(self, cmd):
+ pass
+
+ # The timeout parameter of target.run is set to 0 to make the ssh command
+ # run with no timeout.
+ def run_configure(self, configure_args='', extra_cmds=''):
+ return self._run('cd %s; %s ./configure %s' % (self.targetdir, extra_cmds, configure_args))
+
+ def run_make(self, make_args=''):
+ return self._run('cd %s; make %s' % (self.targetdir, make_args))
+
+ def run_install(self, install_args=''):
+ return self._run('cd %s; make install %s' % (self.targetdir, install_args))
+
+ def clean(self):
+ self._run('rm -rf %s' % self.targetdir)
+ subprocess.check_call('rm -f %s' % self.localarchive, shell=True)
+ pass
+
+class TargetBuildProject(BuildProject):
+
+ def __init__(self, target, d, uri, foldername=None):
+ self.target = target
+ self.targetdir = "~/"
+ BuildProject.__init__(self, d, uri, foldername)
+
+ def download_archive(self):
+
+ self._download_archive()
+
+ (status, output) = self.target.copy_to(self.localarchive, self.targetdir)
+ if status != 0:
+ raise Exception("Failed to copy archive to target, output: %s" % output)
+
+ (status, output) = self.target.run('tar xf %s%s -C %s' % (self.targetdir, self.archive, self.targetdir))
+ if status != 0:
+ raise Exception("Failed to extract archive, output: %s" % output)
+
+ #Change targetdir to project folder
+ self.targetdir = self.targetdir + self.fname
+
+ # The timeout parameter of target.run is set to 0 to make the ssh command
+ # run with no timeout.
+ def _run(self, cmd):
+ return self.target.run(cmd, 0)[0]
+
+
+class SDKBuildProject(BuildProject):
+
+ def __init__(self, testpath, sdkenv, d, uri, foldername=None):
+ self.sdkenv = sdkenv
+ self.testdir = testpath
+ self.targetdir = testpath
+ bb.utils.mkdirhier(testpath)
+ self.datetime = d.getVar('DATETIME')
+ self.testlogdir = d.getVar("TEST_LOG_DIR")
+ bb.utils.mkdirhier(self.testlogdir)
+ self.logfile = os.path.join(self.testlogdir, "sdk_target_log.%s" % self.datetime)
+ BuildProject.__init__(self, d, uri, foldername, tmpdir=testpath)
+
+ def download_archive(self):
+
+ self._download_archive()
+
+ cmd = 'tar xf %s%s -C %s' % (self.targetdir, self.archive, self.targetdir)
+ subprocess.check_output(cmd, shell=True)
+
+ #Change targetdir to project folder
+ self.targetdir = os.path.join(self.targetdir, self.fname)
+
+ def run_configure(self, configure_args='', extra_cmds=' gnu-configize; '):
+ return super(SDKBuildProject, self).run_configure(configure_args=(configure_args or '$CONFIGURE_FLAGS'), extra_cmds=extra_cmds)
+
+ def run_install(self, install_args=''):
+ return super(SDKBuildProject, self).run_install(install_args=(install_args or "DESTDIR=%s/../install" % self.targetdir))
+
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s\n" % msg)
+
+ def _run(self, cmd):
+ self.log("Running . %s; " % self.sdkenv + cmd)
+ return subprocess.check_call(". %s; " % self.sdkenv + cmd, shell=True)
diff --git a/poky/meta/lib/oeqa/utils/testexport.py b/poky/meta/lib/oeqa/utils/testexport.py
new file mode 100644
index 000000000..be2a2110f
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/testexport.py
@@ -0,0 +1,263 @@
+# Copyright (C) 2015 Intel Corporation
+#
+# Released under the MIT license (see COPYING.MIT)
+
+# Provides functions to help with exporting binaries obtained from built targets
+
+import os, re, glob as g, shutil as sh,sys
+from time import sleep
+from .commands import runCmd
+from difflib import SequenceMatcher as SM
+
+try:
+ import bb
+except ImportError:
+ class my_log():
+ def __init__(self):
+ pass
+ def plain(self, msg):
+ if msg:
+ print(msg)
+ def warn(self, msg):
+ if msg:
+ print("WARNING: " + msg)
+ def fatal(self, msg):
+ if msg:
+ print("FATAL:" + msg)
+ sys.exit(1)
+ bb = my_log()
+
+
+def determine_if_poky_env():
+ """
+ used to determine if we are inside the poky env or not. Usefull for remote machine where poky is not present
+ """
+ check_env = True if ("/scripts" and "/bitbake/bin") in os.getenv("PATH") else False
+ return check_env
+
+
+def get_dest_folder(tune_features, folder_list):
+ """
+ Function to determine what rpm deploy dir to choose for a given architecture based on TUNE_FEATURES
+ """
+ features_list = tune_features.split(" ")
+ features_list.reverse()
+ features_list = "_".join(features_list)
+ match_rate = 0
+ best_match = None
+ for folder in folder_list:
+ curr_match_rate = SM(None, folder, features_list).ratio()
+ if curr_match_rate > match_rate:
+ match_rate = curr_match_rate
+ best_match = folder
+ return best_match
+
+
+def process_binaries(d, params):
+ param_list = params
+ export_env = d.getVar("TEST_EXPORT_ONLY")
+
+ def extract_binary(pth_to_pkg, dest_pth=None):
+ cpio_command = runCmd("which cpio")
+ rpm2cpio_command = runCmd("ls /usr/bin/rpm2cpio")
+ if (cpio_command.status != 0) and (rpm2cpio_command.status != 0):
+ bb.fatal("Either \"rpm2cpio\" or \"cpio\" tools are not available on your system."
+ "All binaries extraction processes will not be available, crashing all related tests."
+ "Please install them according to your OS recommendations") # will exit here
+ if dest_pth:
+ os.chdir(dest_pth)
+ else:
+ os.chdir("%s" % os.sep)# this is for native package
+ extract_bin_command = runCmd("%s %s | %s -idm" % (rpm2cpio_command.output, pth_to_pkg, cpio_command.output)) # semi-hardcoded because of a bug on poky's rpm2cpio
+ return extract_bin_command
+
+ if determine_if_poky_env(): # machine with poky environment
+ exportpath = d.getVar("TEST_EXPORT_DIR") if export_env else d.getVar("DEPLOY_DIR")
+ rpm_deploy_dir = d.getVar("DEPLOY_DIR_RPM")
+ arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(rpm_deploy_dir))
+ arch_rpm_dir = os.path.join(rpm_deploy_dir, arch)
+ extracted_bin_dir = os.path.join(exportpath,"binaries", arch, "extracted_binaries")
+ packaged_bin_dir = os.path.join(exportpath,"binaries", arch, "packaged_binaries")
+ # creating necessary directory structure in case testing is done in poky env.
+ if export_env == "0":
+ if not os.path.exists(extracted_bin_dir): bb.utils.mkdirhier(extracted_bin_dir)
+ if not os.path.exists(packaged_bin_dir): bb.utils.mkdirhier(packaged_bin_dir)
+
+ if param_list[3] == "native":
+ if export_env == "1": #this is a native package and we only need to copy it. no need for extraction
+ native_rpm_dir = os.path.join(rpm_deploy_dir, get_dest_folder("{} nativesdk".format(d.getVar("BUILD_SYS")), os.listdir(rpm_deploy_dir)))
+ native_rpm_file_list = [item for item in os.listdir(native_rpm_dir) if re.search("nativesdk-" + param_list[0] + "-([0-9]+\.*)", item)]
+ if not native_rpm_file_list:
+ bb.warn("Couldn't find any version of {} native package. Related tests will most probably fail.".format(param_list[0]))
+ return ""
+ for item in native_rpm_file_list:# will copy all versions of package. Used version will be selected on remote machine
+ bb.plain("Copying native package file: %s" % item)
+ sh.copy(os.path.join(rpm_deploy_dir, native_rpm_dir, item), os.path.join(d.getVar("TEST_EXPORT_DIR"), "binaries", "native"))
+ else: # nothing to do here; running tests under bitbake, so we asume native binaries are in sysroots dir.
+ if param_list[1] or param_list[4]:
+ bb.warn("Native binary %s %s%s. Running tests under bitbake environment. Version can't be checked except when the test itself does it"
+ " and binary can't be removed."%(param_list[0],"has assigned ver. " + param_list[1] if param_list[1] else "",
+ ", is marked for removal" if param_list[4] else ""))
+ else:# the package is target aka DUT intended and it is either required to be delivered in an extracted form or in a packaged version
+ target_rpm_file_list = [item for item in os.listdir(arch_rpm_dir) if re.search(param_list[0] + "-([0-9]+\.*)", item)]
+ if not target_rpm_file_list:
+ bb.warn("Couldn't find any version of target package %s. Please ensure it was built. "
+ "Related tests will probably fail." % param_list[0])
+ return ""
+ if param_list[2] == "rpm": # binary should be deployed as rpm; (other, .deb, .ipk? ; in the near future)
+ for item in target_rpm_file_list: # copying all related rpm packages. "Intuition" reasons, someone may need other versions too. Deciding later on version
+ bb.plain("Copying target specific packaged file: %s" % item)
+ sh.copy(os.path.join(arch_rpm_dir, item), packaged_bin_dir)
+ return "copied"
+ else: # it is required to extract the binary
+ if param_list[1]: # the package is versioned
+ for item in target_rpm_file_list:
+ if re.match(".*-{}-.*\.rpm".format(param_list[1]), item):
+ destination = os.path.join(extracted_bin_dir,param_list[0], param_list[1])
+ bb.utils.mkdirhier(destination)
+ extract_binary(os.path.join(arch_rpm_dir, item), destination)
+ break
+ else:
+ bb.warn("Couldn't find the desired version %s for target binary %s. Related test cases will probably fail." % (param_list[1], param_list[0]))
+ return ""
+ return "extracted"
+ else: # no version provided, just extract one binary
+ destination = os.path.join(extracted_bin_dir,param_list[0],
+ re.search(".*-([0-9]+\.[0-9]+)-.*rpm", target_rpm_file_list[0]).group(1))
+ bb.utils.mkdirhier(destination)
+ extract_binary(os.path.join(arch_rpm_dir, target_rpm_file_list[0]), destination)
+ return "extracted"
+ else: # remote machine
+ binaries_path = os.getenv("bin_dir")# in order to know where the binaries are, bin_dir is set as env. variable
+ if param_list[3] == "native": #need to extract the native pkg here
+ native_rpm_dir = os.path.join(binaries_path, "native")
+ native_rpm_file_list = os.listdir(native_rpm_dir)
+ for item in native_rpm_file_list:
+ if param_list[1] and re.match("nativesdk-{}-{}-.*\.rpm".format(param_list[0], param_list[1]), item): # native package has version
+ extract_binary(os.path.join(native_rpm_dir, item))
+ break
+ else:# just copy any related native binary
+ found_version = re.match("nativesdk-{}-([0-9]+\.[0-9]+)-".format(param_list[0]), item).group(1)
+ if found_version:
+ extract_binary(os.path.join(native_rpm_dir, item))
+ else:
+ bb.warn("Couldn't find native package %s%s. Related test cases will be influenced." %
+ (param_list[0], " with version " + param_list[1] if param_list[1] else ""))
+ return
+
+ else: # this is for target device
+ if param_list[2] == "rpm":
+ return "No need to extract, this is an .rpm file"
+ arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(binaries_path))
+ extracted_bin_path = os.path.join(binaries_path, arch, "extracted_binaries")
+ extracted_bin_list = [item for item in os.listdir(extracted_bin_path)]
+ packaged_bin_path = os.path.join(binaries_path, arch, "packaged_binaries")
+ packaged_bin_file_list = os.listdir(packaged_bin_path)
+ # see if the package is already in the extracted ones; maybe it was deployed when exported the env.
+ if os.path.exists(os.path.join(extracted_bin_path, param_list[0], param_list[1] if param_list[1] else "")):
+ return "binary %s is already extracted" % param_list[0]
+ else: # we need to search for it in the packaged binaries directory. It may have been shipped after export
+ for item in packaged_bin_file_list:
+ if param_list[1]:
+ if re.match("%s-%s.*rpm" % (param_list[0], param_list[1]), item): # package with version
+ if not os.path.exists(os.path.join(extracted_bin_path, param_list[0],param_list[1])):
+ os.makedirs(os.path.join(extracted_bin_path, param_list[0], param_list[1]))
+ extract_binary(os.path.join(packaged_bin_path, item), os.path.join(extracted_bin_path, param_list[0],param_list[1]))
+ bb.plain("Using {} for {}".format(os.path.join(packaged_bin_path, item), param_list[0]))
+ break
+ else:
+ if re.match("%s-.*rpm" % param_list[0], item):
+ found_version = re.match(".*-([0-9]+\.[0-9]+)-", item).group(1)
+ if not os.path.exists(os.path.join(extracted_bin_path, param_list[0], found_version)):
+ os.makedirs(os.path.join(extracted_bin_path, param_list[0], found_version))
+ bb.plain("Used ver. %s for %s" % (found_version, param_list[0]))
+ extract_binary(os.path.join(packaged_bin_path, item), os.path.join(extracted_bin_path, param_list[0], found_version))
+ break
+ else:
+ bb.warn("Couldn't find target package %s%s. Please ensure it is available "
+ "in either of these directories: extracted_binaries or packaged_binaries. "
+ "Related tests will probably fail." % (param_list[0], " with version " + param_list[1] if param_list[1] else ""))
+ return
+ return "Binary %s extracted successfully." % param_list[0]
+
+
+def files_to_copy(base_dir):
+ """
+ Produces a list of files relative to the base dir path sent as param
+ :return: the list of relative path files
+ """
+ files_list = []
+ dir_list = [base_dir]
+ count = 1
+ dir_count = 1
+ while (dir_count == 1 or dir_count != count):
+ count = dir_count
+ for dir in dir_list:
+ for item in os.listdir(dir):
+ if os.path.isdir(os.path.join(dir, item)) and os.path.join(dir, item) not in dir_list:
+ dir_list.append(os.path.join(dir, item))
+ dir_count = len(dir_list)
+ elif os.path.join(dir, item) not in files_list and os.path.isfile(os.path.join(dir, item)):
+ files_list.append(os.path.join(dir, item))
+ return files_list
+
+
+def send_bin_to_DUT(d,params):
+ from oeqa.oetest import oeRuntimeTest
+ param_list = params
+ cleanup_list = list()
+ bins_dir = os.path.join(d.getVar("TEST_EXPORT_DIR"), "binaries") if determine_if_poky_env() \
+ else os.getenv("bin_dir")
+ arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(bins_dir))
+ arch_rpms_dir = os.path.join(bins_dir, arch, "packaged_binaries")
+ extracted_bin_dir = os.path.join(bins_dir, arch, "extracted_binaries", param_list[0])
+
+ def send_extracted_binary():
+ bin_local_dir = os.path.join(extracted_bin_dir, param_list[1] if param_list[1] else os.listdir(extracted_bin_dir)[0])
+ for item in files_to_copy(bin_local_dir):
+ split_path = item.split(bin_local_dir)[1]
+ path_on_DUT = split_path if split_path[0] is "/" else "/" + split_path # create the path as on DUT; eg. /usr/bin/bin_file
+ (status, output) = oeRuntimeTest.tc.target.copy_to(item, path_on_DUT)
+ if status != 0:
+ bb.warn("Failed to copy %s binary file %s on the remote target: %s" %
+ (param_list[0], "ver. " + param_list[1] if param_list[1] else "", d.getVar("MACHINE")))
+ return
+ if param_list[4] == "rm":
+ cleanup_list.append(path_on_DUT)
+ return cleanup_list
+
+ def send_rpm(remote_path): # if it is not required to have an extracted binary, but to send an .rpm file
+ rpm_to_send = ""
+ for item in os.listdir(arch_rpms_dir):
+ if param_list[1] and re.match("%s-%s-.*rpm"%(param_list[0], param_list[1]), item):
+ rpm_to_send = item
+ break
+ elif re.match("%s-[0-9]+\.[0-9]+-.*rpm" % param_list[0], item):
+ rpm_to_send = item
+ break
+ else:
+ bb.warn("No rpm package found for %s %s in .rpm files dir %s. Skipping deployment." %
+ (param_list[0], "ver. " + param_list[1] if param_list[1] else "", rpms_file_dir) )
+ return
+ (status, output) = oeRuntimeTest.tc.target.copy_to(os.path.join(arch_rpms_dir, rpm_to_send), remote_path)
+ if status != 0:
+ bb.warn("Failed to copy %s on the remote target: %s" %(param_list[0], d.getVar("MACHINE")))
+ return
+ if param_list[4] == "rm":
+ cleanup_list.append(os.path.join(remote_path, rpm_to_send))
+ return cleanup_list
+
+ if param_list[2] == "rpm": # send an .rpm file
+ return send_rpm("/home/root") # rpms will be sent on home dir of remote machine
+ else:
+ return send_extracted_binary()
+
+
+def rm_bin(removal_list): # need to know both if the binary is sent archived and the path where it is sent if archived
+ from oeqa.oetest import oeRuntimeTest
+ for item in removal_list:
+ (status,output) = oeRuntimeTest.tc.target.run("rm " + item)
+ if status != 0:
+ bb.warn("Failed to remove: %s. Please ensure connection with the target device is up and running and "
+ "you have the needed rights." % item)
+
OpenPOWER on IntegriCloud