summaryrefslogtreecommitdiffstats
path: root/import-layers/yocto-poky/meta/lib/oeqa/utils/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'import-layers/yocto-poky/meta/lib/oeqa/utils/commands.py')
-rw-r--r--import-layers/yocto-poky/meta/lib/oeqa/utils/commands.py357
1 files changed, 0 insertions, 357 deletions
diff --git a/import-layers/yocto-poky/meta/lib/oeqa/utils/commands.py b/import-layers/yocto-poky/meta/lib/oeqa/utils/commands.py
deleted file mode 100644
index 0d9cf23fe..000000000
--- a/import-layers/yocto-poky/meta/lib/oeqa/utils/commands.py
+++ /dev/null
@@ -1,357 +0,0 @@
-# Copyright (c) 2013-2014 Intel Corporation
-#
-# Released under the MIT license (see COPYING.MIT)
-
-# DESCRIPTION
-# This module is mainly used by scripts/oe-selftest and modules under meta/oeqa/selftest
-# It provides a class and methods for running commands on the host in a convienent way for tests.
-
-
-
-import os
-import sys
-import signal
-import subprocess
-import threading
-import time
-import logging
-from oeqa.utils import CommandError
-from oeqa.utils import ftools
-import re
-import contextlib
-# Export test doesn't require bb
-try:
- import bb
-except ImportError:
- pass
-
-class Command(object):
- def __init__(self, command, bg=False, timeout=None, data=None, output_log=None, **options):
-
- self.defaultopts = {
- "stdout": subprocess.PIPE,
- "stderr": subprocess.STDOUT,
- "stdin": None,
- "shell": False,
- "bufsize": -1,
- }
-
- self.cmd = command
- self.bg = bg
- self.timeout = timeout
- self.data = data
-
- self.options = dict(self.defaultopts)
- if isinstance(self.cmd, str):
- self.options["shell"] = True
- if self.data:
- self.options['stdin'] = subprocess.PIPE
- self.options.update(options)
-
- self.status = None
- # We collect chunks of output before joining them at the end.
- self._output_chunks = []
- self._error_chunks = []
- self.output = None
- self.error = None
- self.threads = []
-
- self.output_log = output_log
- self.log = logging.getLogger("utils.commands")
-
- def run(self):
- self.process = subprocess.Popen(self.cmd, **self.options)
-
- def readThread(output, stream, logfunc):
- if logfunc:
- for line in stream:
- output.append(line)
- logfunc(line.decode("utf-8", errors='replace').rstrip())
- else:
- output.append(stream.read())
-
- def readStderrThread():
- readThread(self._error_chunks, self.process.stderr, self.output_log.error if self.output_log else None)
-
- def readStdoutThread():
- readThread(self._output_chunks, self.process.stdout, self.output_log.info if self.output_log else None)
-
- def writeThread():
- try:
- self.process.stdin.write(self.data)
- self.process.stdin.close()
- except OSError as ex:
- # It's not an error when the command does not consume all
- # of our data. subprocess.communicate() also ignores that.
- if ex.errno != EPIPE:
- raise
-
- # We write in a separate thread because then we can read
- # without worrying about deadlocks. The additional thread is
- # expected to terminate by itself and we mark it as a daemon,
- # so even it should happen to not terminate for whatever
- # reason, the main process will still exit, which will then
- # kill the write thread.
- if self.data:
- threading.Thread(target=writeThread, daemon=True).start()
- if self.process.stderr:
- thread = threading.Thread(target=readStderrThread)
- thread.start()
- self.threads.append(thread)
- if self.output_log:
- self.output_log.info('Running: %s' % self.cmd)
- thread = threading.Thread(target=readStdoutThread)
- thread.start()
- self.threads.append(thread)
-
- self.log.debug("Running command '%s'" % self.cmd)
-
- if not self.bg:
- if self.timeout is None:
- for thread in self.threads:
- thread.join()
- else:
- deadline = time.time() + self.timeout
- for thread in self.threads:
- timeout = deadline - time.time()
- if timeout < 0:
- timeout = 0
- thread.join(timeout)
- self.stop()
-
- def stop(self):
- for thread in self.threads:
- if thread.isAlive():
- self.process.terminate()
- # let's give it more time to terminate gracefully before killing it
- thread.join(5)
- if thread.isAlive():
- self.process.kill()
- thread.join()
-
- def finalize_output(data):
- if not data:
- data = ""
- else:
- data = b"".join(data)
- data = data.decode("utf-8", errors='replace').rstrip()
- return data
-
- self.output = finalize_output(self._output_chunks)
- self._output_chunks = None
- # self.error used to be a byte string earlier, probably unintentionally.
- # Now it is a normal string, just like self.output.
- self.error = finalize_output(self._error_chunks)
- self._error_chunks = None
- # At this point we know that the process has closed stdout/stderr, so
- # it is safe and necessary to wait for the actual process completion.
- self.status = self.process.wait()
-
- self.log.debug("Command '%s' returned %d as exit code." % (self.cmd, self.status))
- # logging the complete output is insane
- # bitbake -e output is really big
- # and makes the log file useless
- if self.status:
- lout = "\n".join(self.output.splitlines()[-20:])
- self.log.debug("Last 20 lines:\n%s" % lout)
-
-
-class Result(object):
- pass
-
-
-def runCmd(command, ignore_status=False, timeout=None, assert_error=True,
- native_sysroot=None, limit_exc_output=0, output_log=None, **options):
- result = Result()
-
- if native_sysroot:
- extra_paths = "%s/sbin:%s/usr/sbin:%s/usr/bin" % \
- (native_sysroot, native_sysroot, native_sysroot)
- nenv = dict(options.get('env', os.environ))
- nenv['PATH'] = extra_paths + ':' + nenv.get('PATH', '')
- options['env'] = nenv
-
- cmd = Command(command, timeout=timeout, output_log=output_log, **options)
- cmd.run()
-
- result.command = command
- result.status = cmd.status
- result.output = cmd.output
- result.error = cmd.error
- result.pid = cmd.process.pid
-
- if result.status and not ignore_status:
- exc_output = result.output
- if limit_exc_output > 0:
- split = result.output.splitlines()
- if len(split) > limit_exc_output:
- exc_output = "\n... (last %d lines of output)\n" % limit_exc_output + \
- '\n'.join(split[-limit_exc_output:])
- if assert_error:
- raise AssertionError("Command '%s' returned non-zero exit status %d:\n%s" % (command, result.status, exc_output))
- else:
- raise CommandError(result.status, command, exc_output)
-
- return result
-
-
-def bitbake(command, ignore_status=False, timeout=None, postconfig=None, output_log=None, **options):
-
- if postconfig:
- postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf')
- ftools.write_file(postconfig_file, postconfig)
- extra_args = "-R %s" % postconfig_file
- else:
- extra_args = ""
-
- if isinstance(command, str):
- cmd = "bitbake " + extra_args + " " + command
- else:
- cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]]
-
- try:
- return runCmd(cmd, ignore_status, timeout, output_log=output_log, **options)
- finally:
- if postconfig:
- os.remove(postconfig_file)
-
-
-def get_bb_env(target=None, postconfig=None):
- if target:
- return bitbake("-e %s" % target, postconfig=postconfig).output
- else:
- return bitbake("-e", postconfig=postconfig).output
-
-def get_bb_vars(variables=None, target=None, postconfig=None):
- """Get values of multiple bitbake variables"""
- bbenv = get_bb_env(target, postconfig=postconfig)
-
- if variables is not None:
- variables = list(variables)
- var_re = re.compile(r'^(export )?(?P<var>\w+(_.*)?)="(?P<value>.*)"$')
- unset_re = re.compile(r'^unset (?P<var>\w+)$')
- lastline = None
- values = {}
- for line in bbenv.splitlines():
- match = var_re.match(line)
- val = None
- if match:
- val = match.group('value')
- else:
- match = unset_re.match(line)
- if match:
- # Handle [unexport] variables
- if lastline.startswith('# "'):
- val = lastline.split('"')[1]
- if val:
- var = match.group('var')
- if variables is None:
- values[var] = val
- else:
- if var in variables:
- values[var] = val
- variables.remove(var)
- # Stop after all required variables have been found
- if not variables:
- break
- lastline = line
- if variables:
- # Fill in missing values
- for var in variables:
- values[var] = None
- return values
-
-def get_bb_var(var, target=None, postconfig=None):
- return get_bb_vars([var], target, postconfig)[var]
-
-def get_test_layer():
- layers = get_bb_var("BBLAYERS").split()
- testlayer = None
- for l in layers:
- if '~' in l:
- l = os.path.expanduser(l)
- if "/meta-selftest" in l and os.path.isdir(l):
- testlayer = l
- break
- return testlayer
-
-def create_temp_layer(templayerdir, templayername, priority=999, recipepathspec='recipes-*/*'):
- os.makedirs(os.path.join(templayerdir, 'conf'))
- with open(os.path.join(templayerdir, 'conf', 'layer.conf'), 'w') as f:
- f.write('BBPATH .= ":${LAYERDIR}"\n')
- f.write('BBFILES += "${LAYERDIR}/%s/*.bb \\' % recipepathspec)
- f.write(' ${LAYERDIR}/%s/*.bbappend"\n' % recipepathspec)
- f.write('BBFILE_COLLECTIONS += "%s"\n' % templayername)
- f.write('BBFILE_PATTERN_%s = "^${LAYERDIR}/"\n' % templayername)
- f.write('BBFILE_PRIORITY_%s = "%d"\n' % (templayername, priority))
- f.write('BBFILE_PATTERN_IGNORE_EMPTY_%s = "1"\n' % templayername)
- f.write('LAYERSERIES_COMPAT_%s = "${LAYERSERIES_COMPAT_core}"\n' % templayername)
-
-@contextlib.contextmanager
-def runqemu(pn, ssh=True, runqemuparams='', image_fstype=None, launch_cmd=None, qemuparams=None, overrides={}, discard_writes=True):
- """
- launch_cmd means directly run the command, don't need set rootfs or env vars.
- """
-
- import bb.tinfoil
- import bb.build
-
- # Need a non-'BitBake' logger to capture the runner output
- targetlogger = logging.getLogger('TargetRunner')
- targetlogger.setLevel(logging.DEBUG)
- handler = logging.StreamHandler(sys.stdout)
- targetlogger.addHandler(handler)
-
- tinfoil = bb.tinfoil.Tinfoil()
- tinfoil.prepare(config_only=False, quiet=True)
- try:
- tinfoil.logger.setLevel(logging.WARNING)
- import oeqa.targetcontrol
- tinfoil.config_data.setVar("TEST_LOG_DIR", "${WORKDIR}/testimage")
- tinfoil.config_data.setVar("TEST_QEMUBOOT_TIMEOUT", "1000")
- # Tell QemuTarget() whether need find rootfs/kernel or not
- if launch_cmd:
- tinfoil.config_data.setVar("FIND_ROOTFS", '0')
- else:
- tinfoil.config_data.setVar("FIND_ROOTFS", '1')
-
- recipedata = tinfoil.parse_recipe(pn)
- for key, value in overrides.items():
- recipedata.setVar(key, value)
-
- logdir = recipedata.getVar("TEST_LOG_DIR")
-
- qemu = oeqa.targetcontrol.QemuTarget(recipedata, targetlogger, image_fstype)
- finally:
- # We need to shut down tinfoil early here in case we actually want
- # to run tinfoil-using utilities with the running QEMU instance.
- # Luckily QemuTarget doesn't need it after the constructor.
- tinfoil.shutdown()
-
- try:
- qemu.deploy()
- try:
- qemu.start(params=qemuparams, ssh=ssh, runqemuparams=runqemuparams, launch_cmd=launch_cmd, discard_writes=discard_writes)
- except bb.build.FuncFailed:
- raise Exception('Failed to start QEMU - see the logs in %s' % logdir)
-
- yield qemu
-
- finally:
- try:
- qemu.stop()
- except:
- pass
- targetlogger.removeHandler(handler)
-
-def updateEnv(env_file):
- """
- Source a file and update environment.
- """
-
- cmd = ". %s; env -0" % env_file
- result = runCmd(cmd)
-
- for line in result.output.split("\0"):
- (key, _, value) = line.partition("=")
- os.environ[key] = value
OpenPOWER on IntegriCloud