summaryrefslogtreecommitdiffstats
path: root/import-layers/yocto-poky/meta/lib/oeqa/selftest/cases/runcmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'import-layers/yocto-poky/meta/lib/oeqa/selftest/cases/runcmd.py')
-rw-r--r--import-layers/yocto-poky/meta/lib/oeqa/selftest/cases/runcmd.py134
1 files changed, 0 insertions, 134 deletions
diff --git a/import-layers/yocto-poky/meta/lib/oeqa/selftest/cases/runcmd.py b/import-layers/yocto-poky/meta/lib/oeqa/selftest/cases/runcmd.py
deleted file mode 100644
index d76d7063c..000000000
--- a/import-layers/yocto-poky/meta/lib/oeqa/selftest/cases/runcmd.py
+++ /dev/null
@@ -1,134 +0,0 @@
-from oeqa.selftest.case import OESelftestTestCase
-from oeqa.utils.commands import runCmd
-from oeqa.utils import CommandError
-from oeqa.core.decorator.oeid import OETestID
-
-import subprocess
-import threading
-import time
-import signal
-
-class MemLogger(object):
- def __init__(self):
- self.info_msgs = []
- self.error_msgs = []
-
- def info(self, msg):
- self.info_msgs.append(msg)
-
- def error(self, msg):
- self.error_msgs.append(msg)
-
-class RunCmdTests(OESelftestTestCase):
- """ Basic tests for runCmd() utility function """
-
- # The delta is intentionally smaller than the timeout, to detect cases where
- # we incorrectly apply the timeout more than once.
- TIMEOUT = 2
- DELTA = 1
-
- @OETestID(1916)
- def test_result_okay(self):
- result = runCmd("true")
- self.assertEqual(result.status, 0)
-
- @OETestID(1915)
- def test_result_false(self):
- result = runCmd("false", ignore_status=True)
- self.assertEqual(result.status, 1)
-
- @OETestID(1917)
- def test_shell(self):
- # A shell is used for all string commands.
- result = runCmd("false; true", ignore_status=True)
- self.assertEqual(result.status, 0)
-
- @OETestID(1910)
- def test_no_shell(self):
- self.assertRaises(FileNotFoundError,
- runCmd, "false; true", shell=False)
-
- @OETestID(1906)
- def test_list_not_found(self):
- self.assertRaises(FileNotFoundError,
- runCmd, ["false; true"])
-
- @OETestID(1907)
- def test_list_okay(self):
- result = runCmd(["true"])
- self.assertEqual(result.status, 0)
-
- @OETestID(1913)
- def test_result_assertion(self):
- self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar",
- runCmd, "echo foobar >&2; false", shell=True)
-
- @OETestID(1914)
- def test_result_exception(self):
- self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar",
- runCmd, "echo foobar >&2; false", shell=True, assert_error=False)
-
- @OETestID(1911)
- def test_output(self):
- result = runCmd("echo stdout; echo stderr >&2", shell=True)
- self.assertEqual("stdout\nstderr", result.output)
- self.assertEqual("", result.error)
-
- @OETestID(1912)
- def test_output_split(self):
- result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE)
- self.assertEqual("stdout", result.output)
- self.assertEqual("stderr", result.error)
-
- @OETestID(1920)
- def test_timeout(self):
- numthreads = threading.active_count()
- start = time.time()
- # Killing a hanging process only works when not using a shell?!
- result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True)
- self.assertEqual(result.status, -signal.SIGTERM)
- end = time.time()
- self.assertLess(end - start, self.TIMEOUT + self.DELTA)
- self.assertEqual(numthreads, threading.active_count())
-
- @OETestID(1921)
- def test_timeout_split(self):
- numthreads = threading.active_count()
- start = time.time()
- # Killing a hanging process only works when not using a shell?!
- result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE)
- self.assertEqual(result.status, -signal.SIGTERM)
- end = time.time()
- self.assertLess(end - start, self.TIMEOUT + self.DELTA)
- self.assertEqual(numthreads, threading.active_count())
-
- @OETestID(1918)
- def test_stdin(self):
- numthreads = threading.active_count()
- result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT)
- self.assertEqual("hello world", result.output)
- self.assertEqual(numthreads, threading.active_count())
-
- @OETestID(1919)
- def test_stdin_timeout(self):
- numthreads = threading.active_count()
- start = time.time()
- result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True)
- self.assertEqual(result.status, -signal.SIGTERM)
- end = time.time()
- self.assertLess(end - start, self.TIMEOUT + self.DELTA)
- self.assertEqual(numthreads, threading.active_count())
-
- @OETestID(1908)
- def test_log(self):
- log = MemLogger()
- result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log)
- self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
- self.assertEqual([], log.error_msgs)
-
- @OETestID(1909)
- def test_log_split(self):
- log = MemLogger()
- result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE)
- self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
- self.assertEqual(["stderr"], log.error_msgs)
OpenPOWER on IntegriCloud