diff options
author | Patrick Williams <patrick@stwcx.xyz> | 2016-08-17 14:31:25 -0500 |
---|---|---|
committer | Patrick Williams <patrick@stwcx.xyz> | 2016-08-22 16:43:26 +0000 |
commit | 60f9d69e016b11c468c98ea75ba0a60c44afbbc4 (patch) | |
tree | ecb49581a9e41a37943c22cd9ef3f63451b20ee7 /import-layers/yocto-poky/bitbake/lib/prserv | |
parent | e18c61205e0234b03697129c20cc69c9b3940efc (diff) | |
download | blackbird-openbmc-60f9d69e016b11c468c98ea75ba0a60c44afbbc4.tar.gz blackbird-openbmc-60f9d69e016b11c468c98ea75ba0a60c44afbbc4.zip |
yocto-poky: Move to import-layers subdir
We are going to import additional layers, so create a subdir to
hold all of the layers that we import with git-subtree.
Change-Id: I6f732153a22be8ca663035c518837e3cc5ec0799
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Diffstat (limited to 'import-layers/yocto-poky/bitbake/lib/prserv')
-rw-r--r-- | import-layers/yocto-poky/bitbake/lib/prserv/__init__.py | 14 | ||||
-rw-r--r-- | import-layers/yocto-poky/bitbake/lib/prserv/db.py | 276 | ||||
-rw-r--r-- | import-layers/yocto-poky/bitbake/lib/prserv/serv.py | 471 |
3 files changed, 761 insertions, 0 deletions
diff --git a/import-layers/yocto-poky/bitbake/lib/prserv/__init__.py b/import-layers/yocto-poky/bitbake/lib/prserv/__init__.py new file mode 100644 index 000000000..c3cb73ad9 --- /dev/null +++ b/import-layers/yocto-poky/bitbake/lib/prserv/__init__.py @@ -0,0 +1,14 @@ +__version__ = "1.0.0" + +import os, time +import sys,logging + +def init_logger(logfile, loglevel): + numeric_level = getattr(logging, loglevel.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % loglevel) + FORMAT = '%(asctime)-15s %(message)s' + logging.basicConfig(level=numeric_level, filename=logfile, format=FORMAT) + +class NotFoundError(Exception): + pass diff --git a/import-layers/yocto-poky/bitbake/lib/prserv/db.py b/import-layers/yocto-poky/bitbake/lib/prserv/db.py new file mode 100644 index 000000000..2a8618417 --- /dev/null +++ b/import-layers/yocto-poky/bitbake/lib/prserv/db.py @@ -0,0 +1,276 @@ +import logging +import os.path +import errno +import prserv +import time + +try: + import sqlite3 +except ImportError: + from pysqlite2 import dbapi2 as sqlite3 + +logger = logging.getLogger("BitBake.PRserv") + +sqlversion = sqlite3.sqlite_version_info +if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3): + raise Exception("sqlite3 version 3.3.0 or later is required.") + +# +# "No History" mode - for a given query tuple (version, pkgarch, checksum), +# the returned value will be the largest among all the values of the same +# (version, pkgarch). This means the PR value returned can NOT be decremented. +# +# "History" mode - Return a new higher value for previously unseen query +# tuple (version, pkgarch, checksum), otherwise return historical value. +# Value can decrement if returning to a previous build. +# + +class PRTable(object): + def __init__(self, conn, table, nohist): + self.conn = conn + self.nohist = nohist + self.dirty = False + if nohist: + self.table = "%s_nohist" % table + else: + self.table = "%s_hist" % table + + self._execute("CREATE TABLE IF NOT EXISTS %s \ + (version TEXT NOT NULL, \ + pkgarch TEXT NOT NULL, \ + checksum TEXT NOT NULL, \ + value INTEGER, \ + PRIMARY KEY (version, pkgarch, checksum));" % self.table) + + def _execute(self, *query): + """Execute a query, waiting to acquire a lock if necessary""" + start = time.time() + end = start + 20 + while True: + try: + return self.conn.execute(*query) + except sqlite3.OperationalError as exc: + if 'is locked' in str(exc) and end > time.time(): + continue + raise exc + + def sync(self): + self.conn.commit() + self._execute("BEGIN EXCLUSIVE TRANSACTION") + + def sync_if_dirty(self): + if self.dirty: + self.sync() + self.dirty = False + + def _getValueHist(self, version, pkgarch, checksum): + data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, + (version, pkgarch, checksum)) + row=data.fetchone() + if row != None: + return row[0] + else: + #no value found, try to insert + try: + self._execute("INSERT INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));" + % (self.table,self.table), + (version,pkgarch, checksum,version, pkgarch)) + except sqlite3.IntegrityError as exc: + logger.error(str(exc)) + + self.dirty = True + + data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, + (version, pkgarch, checksum)) + row=data.fetchone() + if row != None: + return row[0] + else: + raise prserv.NotFoundError + + def _getValueNohist(self, version, pkgarch, checksum): + data=self._execute("SELECT value FROM %s \ + WHERE version=? AND pkgarch=? AND checksum=? AND \ + value >= (select max(value) from %s where version=? AND pkgarch=?);" + % (self.table, self.table), + (version, pkgarch, checksum, version, pkgarch)) + row=data.fetchone() + if row != None: + return row[0] + else: + #no value found, try to insert + try: + self._execute("INSERT OR REPLACE INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));" + % (self.table,self.table), + (version, pkgarch, checksum, version, pkgarch)) + except sqlite3.IntegrityError as exc: + logger.error(str(exc)) + self.conn.rollback() + + self.dirty = True + + data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, + (version, pkgarch, checksum)) + row=data.fetchone() + if row != None: + return row[0] + else: + raise prserv.NotFoundError + + def getValue(self, version, pkgarch, checksum): + if self.nohist: + return self._getValueNohist(version, pkgarch, checksum) + else: + return self._getValueHist(version, pkgarch, checksum) + + def _importHist(self, version, pkgarch, checksum, value): + val = None + data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, + (version, pkgarch, checksum)) + row = data.fetchone() + if row != None: + val=row[0] + else: + #no value found, try to insert + try: + self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table), + (version, pkgarch, checksum, value)) + except sqlite3.IntegrityError as exc: + logger.error(str(exc)) + + self.dirty = True + + data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, + (version, pkgarch, checksum)) + row = data.fetchone() + if row != None: + val = row[0] + return val + + def _importNohist(self, version, pkgarch, checksum, value): + try: + #try to insert + self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table), + (version, pkgarch, checksum,value)) + except sqlite3.IntegrityError as exc: + #already have the record, try to update + try: + self._execute("UPDATE %s SET value=? WHERE version=? AND pkgarch=? AND checksum=? AND value<?" + % (self.table), + (value,version,pkgarch,checksum,value)) + except sqlite3.IntegrityError as exc: + logger.error(str(exc)) + + self.dirty = True + + data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=? AND value>=?;" % self.table, + (version,pkgarch,checksum,value)) + row=data.fetchone() + if row != None: + return row[0] + else: + return None + + def importone(self, version, pkgarch, checksum, value): + if self.nohist: + return self._importNohist(version, pkgarch, checksum, value) + else: + return self._importHist(version, pkgarch, checksum, value) + + def export(self, version, pkgarch, checksum, colinfo): + metainfo = {} + #column info + if colinfo: + metainfo['tbl_name'] = self.table + metainfo['core_ver'] = prserv.__version__ + metainfo['col_info'] = [] + data = self._execute("PRAGMA table_info(%s);" % self.table) + for row in data: + col = {} + col['name'] = row['name'] + col['type'] = row['type'] + col['notnull'] = row['notnull'] + col['dflt_value'] = row['dflt_value'] + col['pk'] = row['pk'] + metainfo['col_info'].append(col) + + #data info + datainfo = [] + + if self.nohist: + sqlstmt = "SELECT T1.version, T1.pkgarch, T1.checksum, T1.value FROM %s as T1, \ + (SELECT version,pkgarch,max(value) as maxvalue FROM %s GROUP BY version,pkgarch) as T2 \ + WHERE T1.version=T2.version AND T1.pkgarch=T2.pkgarch AND T1.value=T2.maxvalue " % (self.table, self.table) + else: + sqlstmt = "SELECT * FROM %s as T1 WHERE 1=1 " % self.table + sqlarg = [] + where = "" + if version: + where += "AND T1.version=? " + sqlarg.append(str(version)) + if pkgarch: + where += "AND T1.pkgarch=? " + sqlarg.append(str(pkgarch)) + if checksum: + where += "AND T1.checksum=? " + sqlarg.append(str(checksum)) + + sqlstmt += where + ";" + + if len(sqlarg): + data = self._execute(sqlstmt, tuple(sqlarg)) + else: + data = self._execute(sqlstmt) + for row in data: + if row['version']: + col = {} + col['version'] = row['version'] + col['pkgarch'] = row['pkgarch'] + col['checksum'] = row['checksum'] + col['value'] = row['value'] + datainfo.append(col) + return (metainfo, datainfo) + + def dump_db(self, fd): + writeCount = 0 + for line in self.conn.iterdump(): + writeCount = writeCount + len(line) + 1 + fd.write(line) + fd.write('\n') + return writeCount + +class PRData(object): + """Object representing the PR database""" + def __init__(self, filename, nohist=True): + self.filename=os.path.abspath(filename) + self.nohist=nohist + #build directory hierarchy + try: + os.makedirs(os.path.dirname(self.filename)) + except OSError as e: + if e.errno != errno.EEXIST: + raise e + self.connection=sqlite3.connect(self.filename, isolation_level="EXCLUSIVE", check_same_thread = False) + self.connection.row_factory=sqlite3.Row + self.connection.execute("pragma synchronous = off;") + self.connection.execute("PRAGMA journal_mode = WAL;") + self._tables={} + + def disconnect(self): + self.connection.close() + + def __getitem__(self,tblname): + if not isinstance(tblname, basestring): + raise TypeError("tblname argument must be a string, not '%s'" % + type(tblname)) + if tblname in self._tables: + return self._tables[tblname] + else: + tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist) + return tableobj + + def __delitem__(self, tblname): + if tblname in self._tables: + del self._tables[tblname] + logger.info("drop table %s" % (tblname)) + self.connection.execute("DROP TABLE IF EXISTS %s;" % tblname) diff --git a/import-layers/yocto-poky/bitbake/lib/prserv/serv.py b/import-layers/yocto-poky/bitbake/lib/prserv/serv.py new file mode 100644 index 000000000..affccd9b3 --- /dev/null +++ b/import-layers/yocto-poky/bitbake/lib/prserv/serv.py @@ -0,0 +1,471 @@ +import os,sys,logging +import signal, time +from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler +import threading +import Queue +import socket +import StringIO + +try: + import sqlite3 +except ImportError: + from pysqlite2 import dbapi2 as sqlite3 + +import bb.server.xmlrpc +import prserv +import prserv.db +import errno + +logger = logging.getLogger("BitBake.PRserv") + +if sys.hexversion < 0x020600F0: + print("Sorry, python 2.6 or later is required.") + sys.exit(1) + +class Handler(SimpleXMLRPCRequestHandler): + def _dispatch(self,method,params): + try: + value=self.server.funcs[method](*params) + except: + import traceback + traceback.print_exc() + raise + return value + +PIDPREFIX = "/tmp/PRServer_%s_%s.pid" +singleton = None + + +class PRServer(SimpleXMLRPCServer): + def __init__(self, dbfile, logfile, interface, daemon=True): + ''' constructor ''' + try: + SimpleXMLRPCServer.__init__(self, interface, + logRequests=False, allow_none=True) + except socket.error: + ip=socket.gethostbyname(interface[0]) + port=interface[1] + msg="PR Server unable to bind to %s:%s\n" % (ip, port) + sys.stderr.write(msg) + raise PRServiceConfigError + + self.dbfile=dbfile + self.daemon=daemon + self.logfile=logfile + self.working_thread=None + self.host, self.port = self.socket.getsockname() + self.pidfile=PIDPREFIX % (self.host, self.port) + + self.register_function(self.getPR, "getPR") + self.register_function(self.quit, "quit") + self.register_function(self.ping, "ping") + self.register_function(self.export, "export") + self.register_function(self.dump_db, "dump_db") + self.register_function(self.importone, "importone") + self.register_introspection_functions() + + self.requestqueue = Queue.Queue() + self.handlerthread = threading.Thread(target = self.process_request_thread) + self.handlerthread.daemon = False + + def process_request_thread(self): + """Same as in BaseServer but as a thread. + + In addition, exception handling is done here. + + """ + iter_count = 1 + # 60 iterations between syncs or sync if dirty every ~30 seconds + iterations_between_sync = 60 + + bb.utils.set_process_name("PRServ Handler") + + while not self.quit: + try: + (request, client_address) = self.requestqueue.get(True, 30) + except Queue.Empty: + self.table.sync_if_dirty() + continue + try: + self.finish_request(request, client_address) + self.shutdown_request(request) + iter_count = (iter_count + 1) % iterations_between_sync + if iter_count == 0: + self.table.sync_if_dirty() + except: + self.handle_error(request, client_address) + self.shutdown_request(request) + self.table.sync() + self.table.sync_if_dirty() + + def sigint_handler(self, signum, stack): + if self.table: + self.table.sync() + + def sigterm_handler(self, signum, stack): + if self.table: + self.table.sync() + self.quit=True + + def process_request(self, request, client_address): + self.requestqueue.put((request, client_address)) + + def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): + try: + return self.table.export(version, pkgarch, checksum, colinfo) + except sqlite3.Error as exc: + logger.error(str(exc)) + return None + + def dump_db(self): + """ + Returns a script (string) that reconstructs the state of the + entire database at the time this function is called. The script + language is defined by the backing database engine, which is a + function of server configuration. + Returns None if the database engine does not support dumping to + script or if some other error is encountered in processing. + """ + buff = StringIO.StringIO() + try: + self.table.sync() + self.table.dump_db(buff) + return buff.getvalue() + except Exception as exc: + logger.error(str(exc)) + return None + finally: + buff.close() + + def importone(self, version, pkgarch, checksum, value): + return self.table.importone(version, pkgarch, checksum, value) + + def ping(self): + return not self.quit + + def getinfo(self): + return (self.host, self.port) + + def getPR(self, version, pkgarch, checksum): + try: + return self.table.getValue(version, pkgarch, checksum) + except prserv.NotFoundError: + logger.error("can not find value for (%s, %s)",version, checksum) + return None + except sqlite3.Error as exc: + logger.error(str(exc)) + return None + + def quit(self): + self.quit=True + return + + def work_forever(self,): + self.quit = False + self.timeout = 0.5 + + bb.utils.set_process_name("PRServ") + + # DB connection must be created after all forks + self.db = prserv.db.PRData(self.dbfile) + self.table = self.db["PRMAIN"] + + logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % + (self.dbfile, self.host, self.port, str(os.getpid()))) + + self.handlerthread.start() + while not self.quit: + self.handle_request() + self.handlerthread.join() + self.db.disconnect() + logger.info("PRServer: stopping...") + self.server_close() + return + + def start(self): + if self.daemon: + pid = self.daemonize() + else: + pid = self.fork() + + # Ensure both the parent sees this and the child from the work_forever log entry above + logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % + (self.dbfile, self.host, self.port, str(pid))) + + def delpid(self): + os.remove(self.pidfile) + + def daemonize(self): + """ + See Advanced Programming in the UNIX, Sec 13.3 + """ + try: + pid = os.fork() + if pid > 0: + os.waitpid(pid, 0) + #parent return instead of exit to give control + return pid + except OSError as e: + raise Exception("%s [%d]" % (e.strerror, e.errno)) + + os.setsid() + """ + fork again to make sure the daemon is not session leader, + which prevents it from acquiring controlling terminal + """ + try: + pid = os.fork() + if pid > 0: #parent + os._exit(0) + except OSError as e: + raise Exception("%s [%d]" % (e.strerror, e.errno)) + + self.cleanup_handles() + os._exit(0) + + def fork(self): + try: + pid = os.fork() + if pid > 0: + return pid + except OSError as e: + raise Exception("%s [%d]" % (e.strerror, e.errno)) + + bb.utils.signal_on_parent_exit("SIGTERM") + self.cleanup_handles() + os._exit(0) + + def cleanup_handles(self): + signal.signal(signal.SIGINT, self.sigint_handler) + signal.signal(signal.SIGTERM, self.sigterm_handler) + os.chdir("/") + + sys.stdout.flush() + sys.stderr.flush() + si = file('/dev/null', 'r') + so = file(self.logfile, 'a+') + se = so + os.dup2(si.fileno(),sys.stdin.fileno()) + os.dup2(so.fileno(),sys.stdout.fileno()) + os.dup2(se.fileno(),sys.stderr.fileno()) + + # Clear out all log handlers prior to the fork() to avoid calling + # event handlers not part of the PRserver + for logger_iter in logging.Logger.manager.loggerDict.keys(): + logging.getLogger(logger_iter).handlers = [] + + # Ensure logging makes it to the logfile + streamhandler = logging.StreamHandler() + streamhandler.setLevel(logging.DEBUG) + formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") + streamhandler.setFormatter(formatter) + logger.addHandler(streamhandler) + + # write pidfile + pid = str(os.getpid()) + pf = file(self.pidfile, 'w') + pf.write("%s\n" % pid) + pf.close() + + self.work_forever() + self.delpid() + +class PRServSingleton(object): + def __init__(self, dbfile, logfile, interface): + self.dbfile = dbfile + self.logfile = logfile + self.interface = interface + self.host = None + self.port = None + + def start(self): + self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False) + self.prserv.start() + self.host, self.port = self.prserv.getinfo() + + def getinfo(self): + return (self.host, self.port) + +class PRServerConnection(object): + def __init__(self, host, port): + if is_local_special(host, port): + host, port = singleton.getinfo() + self.host = host + self.port = port + self.connection, self.transport = bb.server.xmlrpc._create_server(self.host, self.port) + + def terminate(self): + try: + logger.info("Terminating PRServer...") + self.connection.quit() + except Exception as exc: + sys.stderr.write("%s\n" % str(exc)) + + def getPR(self, version, pkgarch, checksum): + return self.connection.getPR(version, pkgarch, checksum) + + def ping(self): + return self.connection.ping() + + def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): + return self.connection.export(version, pkgarch, checksum, colinfo) + + def dump_db(self): + return self.connection.dump_db() + + def importone(self, version, pkgarch, checksum, value): + return self.connection.importone(version, pkgarch, checksum, value) + + def getinfo(self): + return self.host, self.port + +def start_daemon(dbfile, host, port, logfile): + ip = socket.gethostbyname(host) + pidfile = PIDPREFIX % (ip, port) + try: + pf = file(pidfile,'r') + pid = int(pf.readline().strip()) + pf.close() + except IOError: + pid = None + + if pid: + sys.stderr.write("pidfile %s already exist. Daemon already running?\n" + % pidfile) + return 1 + + server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) + server.start() + + # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with + # the one the server actually is listening, so at least warn the user about it + _,rport = server.getinfo() + if port != rport: + sys.stdout.write("Server is listening at port %s instead of %s\n" + % (rport,port)) + return 0 + +def stop_daemon(host, port): + import glob + ip = socket.gethostbyname(host) + pidfile = PIDPREFIX % (ip, port) + try: + pf = file(pidfile,'r') + pid = int(pf.readline().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + # when server starts at port=0 (i.e. localhost:0), server actually takes another port, + # so at least advise the user which ports the corresponding server is listening + ports = [] + portstr = "" + for pf in glob.glob(PIDPREFIX % (ip,'*')): + bn = os.path.basename(pf) + root, _ = os.path.splitext(bn) + ports.append(root.split('_')[-1]) + if len(ports): + portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports)) + + sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" + % (pidfile,portstr)) + return 1 + + try: + PRServerConnection(ip, port).terminate() + except: + logger.critical("Stop PRService %s:%d failed" % (host,port)) + + try: + if pid: + wait_timeout = 0 + print("Waiting for pr-server to exit.") + while is_running(pid) and wait_timeout < 50: + time.sleep(0.1) + wait_timeout += 1 + + if is_running(pid): + print("Sending SIGTERM to pr-server.") + os.kill(pid,signal.SIGTERM) + time.sleep(0.1) + + if os.path.exists(pidfile): + os.remove(pidfile) + + except OSError as e: + err = str(e) + if err.find("No such process") <= 0: + raise e + + return 0 + +def is_running(pid): + try: + os.kill(pid, 0) + except OSError as err: + if err.errno == errno.ESRCH: + return False + return True + +def is_local_special(host, port): + if host.strip().upper() == 'localhost'.upper() and (not port): + return True + else: + return False + +class PRServiceConfigError(Exception): + pass + +def auto_start(d): + global singleton + + host_params = filter(None, (d.getVar('PRSERV_HOST', True) or '').split(':')) + if not host_params: + return None + + if len(host_params) != 2: + logger.critical('\n'.join(['PRSERV_HOST: incorrect format', + 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) + raise PRServiceConfigError + + if is_local_special(host_params[0], int(host_params[1])) and not singleton: + import bb.utils + cachedir = (d.getVar("PERSISTENT_DIR", True) or d.getVar("CACHE", True)) + if not cachedir: + logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable") + raise PRServiceConfigError + bb.utils.mkdirhier(cachedir) + dbfile = os.path.join(cachedir, "prserv.sqlite3") + logfile = os.path.join(cachedir, "prserv.log") + singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) + singleton.start() + if singleton: + host, port = singleton.getinfo() + else: + host = host_params[0] + port = int(host_params[1]) + + try: + connection = PRServerConnection(host,port) + connection.ping() + realhost, realport = connection.getinfo() + return str(realhost) + ":" + str(realport) + + except Exception: + logger.critical("PRservice %s:%d not available" % (host, port)) + raise PRServiceConfigError + +def auto_shutdown(d=None): + global singleton + if singleton: + host, port = singleton.getinfo() + try: + PRServerConnection(host, port).terminate() + except: + logger.critical("Stop PRService %s:%d failed" % (host,port)) + singleton = None + +def ping(host, port): + conn=PRServerConnection(host, port) + return conn.ping() |