summaryrefslogtreecommitdiffstats
path: root/import-layers/yocto-poky/bitbake/bin/bitbake-worker
diff options
context:
space:
mode:
Diffstat (limited to 'import-layers/yocto-poky/bitbake/bin/bitbake-worker')
-rwxr-xr-ximport-layers/yocto-poky/bitbake/bin/bitbake-worker447
1 files changed, 447 insertions, 0 deletions
diff --git a/import-layers/yocto-poky/bitbake/bin/bitbake-worker b/import-layers/yocto-poky/bitbake/bin/bitbake-worker
new file mode 100755
index 000000000..767a1c033
--- /dev/null
+++ b/import-layers/yocto-poky/bitbake/bin/bitbake-worker
@@ -0,0 +1,447 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import warnings
+sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
+from bb import fetch2
+import logging
+import bb
+import select
+import errno
+import signal
+from multiprocessing import Lock
+
+# Users shouldn't be running this code directly
+if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
+ print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
+ sys.exit(1)
+
+profiling = False
+if sys.argv[1].startswith("decafbadbad"):
+ profiling = True
+ try:
+ import cProfile as profile
+ except:
+ import profile
+
+# Unbuffer stdout to avoid log truncation in the event
+# of an unorderly exit as well as to provide timely
+# updates to log files for use with tail
+try:
+ if sys.stdout.name == '<stdout>':
+ sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
+except:
+ pass
+
+logger = logging.getLogger("BitBake")
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+ bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
+
+
+worker_pipe = sys.stdout.fileno()
+bb.utils.nonblockingfd(worker_pipe)
+# Need to guard against multiprocessing being used in child processes
+# and multiple processes trying to write to the parent at the same time
+worker_pipe_lock = None
+
+handler = bb.event.LogHandler()
+logger.addHandler(handler)
+
+if 0:
+ # Code to write out a log file of all events passing through the worker
+ logfilename = "/tmp/workerlogfile"
+ format_str = "%(levelname)s: %(message)s"
+ conlogformat = bb.msg.BBLogFormatter(format_str)
+ consolelog = logging.FileHandler(logfilename)
+ bb.msg.addDefaultlogFilter(consolelog)
+ consolelog.setFormatter(conlogformat)
+ logger.addHandler(consolelog)
+
+worker_queue = ""
+
+def worker_fire(event, d):
+ data = "<event>" + pickle.dumps(event) + "</event>"
+ worker_fire_prepickled(data)
+
+def worker_fire_prepickled(event):
+ global worker_queue
+
+ worker_queue = worker_queue + event
+ worker_flush()
+
+def worker_flush():
+ global worker_queue, worker_pipe
+
+ if not worker_queue:
+ return
+
+ try:
+ written = os.write(worker_pipe, worker_queue)
+ worker_queue = worker_queue[written:]
+ except (IOError, OSError) as e:
+ if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
+ raise
+
+def worker_child_fire(event, d):
+ global worker_pipe
+ global worker_pipe_lock
+
+ data = "<event>" + pickle.dumps(event) + "</event>"
+ try:
+ worker_pipe_lock.acquire()
+ worker_pipe.write(data)
+ worker_pipe_lock.release()
+ except IOError:
+ sigterm_handler(None, None)
+ raise
+
+bb.event.worker_fire = worker_fire
+
+lf = None
+#lf = open("/tmp/workercommandlog", "w+")
+def workerlog_write(msg):
+ if lf:
+ lf.write(msg)
+ lf.flush()
+
+def sigterm_handler(signum, frame):
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ os.killpg(0, signal.SIGTERM)
+ sys.exit()
+
+def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
+ # We need to setup the environment BEFORE the fork, since
+ # a fork() or exec*() activates PSEUDO...
+
+ envbackup = {}
+ fakeenv = {}
+ umask = None
+
+ taskdep = workerdata["taskdeps"][fn]
+ if 'umask' in taskdep and taskname in taskdep['umask']:
+ # umask might come in as a number or text string..
+ try:
+ umask = int(taskdep['umask'][taskname],8)
+ except TypeError:
+ umask = taskdep['umask'][taskname]
+
+ # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
+ if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
+ envvars = (workerdata["fakerootenv"][fn] or "").split()
+ for key, value in (var.split('=') for var in envvars):
+ envbackup[key] = os.environ.get(key)
+ os.environ[key] = value
+ fakeenv[key] = value
+
+ fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
+ for p in fakedirs:
+ bb.utils.mkdirhier(p)
+ logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
+ (fn, taskname, ', '.join(fakedirs)))
+ else:
+ envvars = (workerdata["fakerootnoenv"][fn] or "").split()
+ for key, value in (var.split('=') for var in envvars):
+ envbackup[key] = os.environ.get(key)
+ os.environ[key] = value
+ fakeenv[key] = value
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ try:
+ pipein, pipeout = os.pipe()
+ pipein = os.fdopen(pipein, 'rb', 4096)
+ pipeout = os.fdopen(pipeout, 'wb', 0)
+ pid = os.fork()
+ except OSError as e:
+ bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
+
+ if pid == 0:
+ def child():
+ global worker_pipe
+ global worker_pipe_lock
+ pipein.close()
+
+ signal.signal(signal.SIGTERM, sigterm_handler)
+ # Let SIGHUP exit as SIGTERM
+ signal.signal(signal.SIGHUP, sigterm_handler)
+ bb.utils.signal_on_parent_exit("SIGTERM")
+
+ # Save out the PID so that the event can include it the
+ # events
+ bb.event.worker_pid = os.getpid()
+ bb.event.worker_fire = worker_child_fire
+ worker_pipe = pipeout
+ worker_pipe_lock = Lock()
+
+ # Make the child the process group leader and ensure no
+ # child process will be controlled by the current terminal
+ # This ensures signals sent to the controlling terminal like Ctrl+C
+ # don't stop the child processes.
+ os.setsid()
+ # No stdin
+ newsi = os.open(os.devnull, os.O_RDWR)
+ os.dup2(newsi, sys.stdin.fileno())
+
+ if umask:
+ os.umask(umask)
+
+ data.setVar("BB_WORKERCONTEXT", "1")
+ data.setVar("BB_TASKDEPDATA", taskdepdata)
+ data.setVar("BUILDNAME", workerdata["buildname"])
+ data.setVar("DATE", workerdata["date"])
+ data.setVar("TIME", workerdata["time"])
+ bb.parse.siggen.set_taskdata(workerdata["sigdata"])
+ ret = 0
+ try:
+ the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
+ the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
+
+ bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
+
+ # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
+ # successfully. We also need to unset anything from the environment which shouldn't be there
+ exports = bb.data.exported_vars(the_data)
+ bb.utils.empty_environment()
+ for e, v in exports:
+ os.environ[e] = v
+ for e in fakeenv:
+ os.environ[e] = fakeenv[e]
+ the_data.setVar(e, fakeenv[e])
+ the_data.setVarFlag(e, 'export', "1")
+
+ if quieterrors:
+ the_data.setVarFlag(taskname, "quieterrors", "1")
+
+ except Exception as exc:
+ if not quieterrors:
+ logger.critical(str(exc))
+ os._exit(1)
+ try:
+ if cfg.dry_run:
+ return 0
+ return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
+ except:
+ os._exit(1)
+ if not profiling:
+ os._exit(child())
+ else:
+ profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
+ prof = profile.Profile()
+ try:
+ ret = profile.Profile.runcall(prof, child)
+ finally:
+ prof.dump_stats(profname)
+ bb.utils.process_profilelog(profname)
+ os._exit(ret)
+ else:
+ for key, value in envbackup.iteritems():
+ if value is None:
+ del os.environ[key]
+ else:
+ os.environ[key] = value
+
+ return pid, pipein, pipeout
+
+class runQueueWorkerPipe():
+ """
+ Abstraction for a pipe between a worker thread and the worker server
+ """
+ def __init__(self, pipein, pipeout):
+ self.input = pipein
+ if pipeout:
+ pipeout.close()
+ bb.utils.nonblockingfd(self.input)
+ self.queue = ""
+
+ def read(self):
+ start = len(self.queue)
+ try:
+ self.queue = self.queue + self.input.read(102400)
+ except (OSError, IOError) as e:
+ if e.errno != errno.EAGAIN:
+ raise
+
+ end = len(self.queue)
+ index = self.queue.find("</event>")
+ while index != -1:
+ worker_fire_prepickled(self.queue[:index+8])
+ self.queue = self.queue[index+8:]
+ index = self.queue.find("</event>")
+ return (end > start)
+
+ def close(self):
+ while self.read():
+ continue
+ if len(self.queue) > 0:
+ print("Warning, worker child left partial message: %s" % self.queue)
+ self.input.close()
+
+normalexit = False
+
+class BitbakeWorker(object):
+ def __init__(self, din):
+ self.input = din
+ bb.utils.nonblockingfd(self.input)
+ self.queue = ""
+ self.cookercfg = None
+ self.databuilder = None
+ self.data = None
+ self.build_pids = {}
+ self.build_pipes = {}
+
+ signal.signal(signal.SIGTERM, self.sigterm_exception)
+ # Let SIGHUP exit as SIGTERM
+ signal.signal(signal.SIGHUP, self.sigterm_exception)
+ if "beef" in sys.argv[1]:
+ bb.utils.set_process_name("Worker (Fakeroot)")
+ else:
+ bb.utils.set_process_name("Worker")
+
+ def sigterm_exception(self, signum, stackframe):
+ if signum == signal.SIGTERM:
+ bb.warn("Worker received SIGTERM, shutting down...")
+ elif signum == signal.SIGHUP:
+ bb.warn("Worker received SIGHUP, shutting down...")
+ self.handle_finishnow(None)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ os.kill(os.getpid(), signal.SIGTERM)
+
+ def serve(self):
+ while True:
+ (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
+ if self.input in ready:
+ try:
+ r = self.input.read()
+ if len(r) == 0:
+ # EOF on pipe, server must have terminated
+ self.sigterm_exception(signal.SIGTERM, None)
+ self.queue = self.queue + r
+ except (OSError, IOError):
+ pass
+ if len(self.queue):
+ self.handle_item("cookerconfig", self.handle_cookercfg)
+ self.handle_item("workerdata", self.handle_workerdata)
+ self.handle_item("runtask", self.handle_runtask)
+ self.handle_item("finishnow", self.handle_finishnow)
+ self.handle_item("ping", self.handle_ping)
+ self.handle_item("quit", self.handle_quit)
+
+ for pipe in self.build_pipes:
+ self.build_pipes[pipe].read()
+ if len(self.build_pids):
+ self.process_waitpid()
+ worker_flush()
+
+
+ def handle_item(self, item, func):
+ if self.queue.startswith("<" + item + ">"):
+ index = self.queue.find("</" + item + ">")
+ while index != -1:
+ func(self.queue[(len(item) + 2):index])
+ self.queue = self.queue[(index + len(item) + 3):]
+ index = self.queue.find("</" + item + ">")
+
+ def handle_cookercfg(self, data):
+ self.cookercfg = pickle.loads(data)
+ self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
+ self.databuilder.parseBaseConfiguration()
+ self.data = self.databuilder.data
+
+ def handle_workerdata(self, data):
+ self.workerdata = pickle.loads(data)
+ bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
+ bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
+ bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
+ bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
+ self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
+
+ def handle_ping(self, _):
+ workerlog_write("Handling ping\n")
+
+ logger.warn("Pong from bitbake-worker!")
+
+ def handle_quit(self, data):
+ workerlog_write("Handling quit\n")
+
+ global normalexit
+ normalexit = True
+ sys.exit(0)
+
+ def handle_runtask(self, data):
+ fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
+ workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
+
+ pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
+
+ self.build_pids[pid] = task
+ self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
+
+ def process_waitpid(self):
+ """
+ Return none is there are no processes awaiting result collection, otherwise
+ collect the process exit codes and close the information pipe.
+ """
+ try:
+ pid, status = os.waitpid(-1, os.WNOHANG)
+ if pid == 0 or os.WIFSTOPPED(status):
+ return None
+ except OSError:
+ return None
+
+ workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
+
+ if os.WIFEXITED(status):
+ status = os.WEXITSTATUS(status)
+ elif os.WIFSIGNALED(status):
+ # Per shell conventions for $?, when a process exits due to
+ # a signal, we return an exit code of 128 + SIGNUM
+ status = 128 + os.WTERMSIG(status)
+
+ task = self.build_pids[pid]
+ del self.build_pids[pid]
+
+ self.build_pipes[pid].close()
+ del self.build_pipes[pid]
+
+ worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
+
+ def handle_finishnow(self, _):
+ if self.build_pids:
+ logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
+ for k, v in self.build_pids.iteritems():
+ try:
+ os.kill(-k, signal.SIGTERM)
+ os.waitpid(-1, 0)
+ except:
+ pass
+ for pipe in self.build_pipes:
+ self.build_pipes[pipe].read()
+
+try:
+ worker = BitbakeWorker(sys.stdin)
+ if not profiling:
+ worker.serve()
+ else:
+ profname = "profile-worker.log"
+ prof = profile.Profile()
+ try:
+ profile.Profile.runcall(prof, worker.serve)
+ finally:
+ prof.dump_stats(profname)
+ bb.utils.process_profilelog(profname)
+except BaseException as e:
+ if not normalexit:
+ import traceback
+ sys.stderr.write(traceback.format_exc())
+ sys.stderr.write(str(e))
+while len(worker_queue):
+ worker_flush()
+workerlog_write("exitting")
+sys.exit(0)
+
OpenPOWER on IntegriCloud