summaryrefslogtreecommitdiffstats
path: root/poky/meta/lib/oeqa/utils/package_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'poky/meta/lib/oeqa/utils/package_manager.py')
-rw-r--r--poky/meta/lib/oeqa/utils/package_manager.py211
1 files changed, 211 insertions, 0 deletions
diff --git a/poky/meta/lib/oeqa/utils/package_manager.py b/poky/meta/lib/oeqa/utils/package_manager.py
new file mode 100644
index 000000000..afd5b8e75
--- /dev/null
+++ b/poky/meta/lib/oeqa/utils/package_manager.py
@@ -0,0 +1,211 @@
+import os
+import json
+import shutil
+
+from oeqa.core.utils.test import getCaseFile, getCaseMethod
+
+def get_package_manager(d, root_path):
+ """
+ Returns an OE package manager that can install packages in root_path.
+ """
+ from oe.package_manager import RpmPM, OpkgPM, DpkgPM
+
+ pkg_class = d.getVar("IMAGE_PKGTYPE")
+ if pkg_class == "rpm":
+ pm = RpmPM(d,
+ root_path,
+ d.getVar('TARGET_VENDOR'),
+ filterbydependencies=False)
+ pm.create_configs()
+
+ elif pkg_class == "ipk":
+ pm = OpkgPM(d,
+ root_path,
+ d.getVar("IPKGCONF_TARGET"),
+ d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"))
+
+ elif pkg_class == "deb":
+ pm = DpkgPM(d,
+ root_path,
+ d.getVar('PACKAGE_ARCHS'),
+ d.getVar('DPKG_ARCH'))
+
+ pm.write_index()
+ pm.update()
+
+ return pm
+
+def find_packages_to_extract(test_suite):
+ """
+ Returns packages to extract required by runtime tests.
+ """
+ from oeqa.core.utils.test import getSuiteCasesFiles
+
+ needed_packages = {}
+ files = getSuiteCasesFiles(test_suite)
+
+ for f in set(files):
+ json_file = _get_json_file(f)
+ if json_file:
+ needed_packages.update(_get_needed_packages(json_file))
+
+ return needed_packages
+
+def _get_json_file(module_path):
+ """
+ Returns the path of the JSON file for a module, empty if doesn't exitst.
+ """
+
+ json_file = '%s.json' % module_path.rsplit('.', 1)[0]
+ if os.path.isfile(module_path) and os.path.isfile(json_file):
+ return json_file
+ else:
+ return ''
+
+def _get_needed_packages(json_file, test=None):
+ """
+ Returns a dict with needed packages based on a JSON file.
+
+ If a test is specified it will return the dict just for that test.
+ """
+ needed_packages = {}
+
+ with open(json_file) as f:
+ test_packages = json.load(f)
+ for key,value in test_packages.items():
+ needed_packages[key] = value
+
+ if test:
+ if test in needed_packages:
+ needed_packages = needed_packages[test]
+ else:
+ needed_packages = {}
+
+ return needed_packages
+
+def extract_packages(d, needed_packages):
+ """
+ Extract packages that will be needed during runtime.
+ """
+
+ import bb
+ import oe.path
+
+ extracted_path = d.getVar('TEST_EXTRACTED_DIR')
+
+ for key,value in needed_packages.items():
+ packages = ()
+ if isinstance(value, dict):
+ packages = (value, )
+ elif isinstance(value, list):
+ packages = value
+ else:
+ bb.fatal('Failed to process needed packages for %s; '
+ 'Value must be a dict or list' % key)
+
+ for package in packages:
+ pkg = package['pkg']
+ rm = package.get('rm', False)
+ extract = package.get('extract', True)
+
+ if extract:
+ #logger.debug(1, 'Extracting %s' % pkg)
+ dst_dir = os.path.join(extracted_path, pkg)
+ # Same package used for more than one test,
+ # don't need to extract again.
+ if os.path.exists(dst_dir):
+ continue
+
+ # Extract package and copy it to TEST_EXTRACTED_DIR
+ pkg_dir = _extract_in_tmpdir(d, pkg)
+ oe.path.copytree(pkg_dir, dst_dir)
+ shutil.rmtree(pkg_dir)
+
+ else:
+ #logger.debug(1, 'Copying %s' % pkg)
+ _copy_package(d, pkg)
+
+def _extract_in_tmpdir(d, pkg):
+ """"
+ Returns path to a temp directory where the package was
+ extracted without dependencies.
+ """
+
+ from oeqa.utils.package_manager import get_package_manager
+
+ pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
+ pm = get_package_manager(d, pkg_path)
+ extract_dir = pm.extract(pkg)
+ shutil.rmtree(pkg_path)
+
+ return extract_dir
+
+def _copy_package(d, pkg):
+ """
+ Copy the RPM, DEB or IPK package to dst_dir
+ """
+
+ from oeqa.utils.package_manager import get_package_manager
+
+ pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
+ dst_dir = d.getVar('TEST_PACKAGED_DIR')
+ pm = get_package_manager(d, pkg_path)
+ pkg_info = pm.package_info(pkg)
+ file_path = pkg_info[pkg]['filepath']
+ shutil.copy2(file_path, dst_dir)
+ shutil.rmtree(pkg_path)
+
+def install_package(test_case):
+ """
+ Installs package in DUT if required.
+ """
+ needed_packages = test_needs_package(test_case)
+ if needed_packages:
+ _install_uninstall_packages(needed_packages, test_case, True)
+
+def uninstall_package(test_case):
+ """
+ Uninstalls package in DUT if required.
+ """
+ needed_packages = test_needs_package(test_case)
+ if needed_packages:
+ _install_uninstall_packages(needed_packages, test_case, False)
+
+def test_needs_package(test_case):
+ """
+ Checks if a test case requires to install/uninstall packages.
+ """
+ test_file = getCaseFile(test_case)
+ json_file = _get_json_file(test_file)
+
+ if json_file:
+ test_method = getCaseMethod(test_case)
+ needed_packages = _get_needed_packages(json_file, test_method)
+ if needed_packages:
+ return needed_packages
+
+ return None
+
+def _install_uninstall_packages(needed_packages, test_case, install=True):
+ """
+ Install/Uninstall packages in the DUT without using a package manager
+ """
+
+ if isinstance(needed_packages, dict):
+ packages = [needed_packages]
+ elif isinstance(needed_packages, list):
+ packages = needed_packages
+
+ for package in packages:
+ pkg = package['pkg']
+ rm = package.get('rm', False)
+ extract = package.get('extract', True)
+ src_dir = os.path.join(test_case.tc.extract_dir, pkg)
+
+ # Install package
+ if install and extract:
+ test_case.tc.target.copyDirTo(src_dir, '/')
+
+ # Uninstall package
+ elif not install and rm:
+ test_case.tc.target.deleteDirStructure(src_dir, '/')
OpenPOWER on IntegriCloud