#!/usr/bin/python -u import gobject import dbus import dbus.service import dbus.mainloop.glib import os import os.path as path import sys from obmc.dbuslib.bindings import DbusProperties, DbusObjectManager, get_dbus from IPy import IP settings_file_path = os.path.join( sys.prefix, 'share', 'phosphor-settings') sys.path.insert(1, settings_file_path) import settings_file as s import re import obmc.mapper DBUS_NAME = 'org.openbmc.settings.Host' CONTROL_INTF = 'org.openbmc.Settings' def walk_nest(d, keys=()): """Arrange dictionary keys and values. Walk the dictionary and establish every possible path returned to and processed by 'create_object' below """ if isinstance(d, dict): for k, v in d.iteritems(): for rv in walk_nest(v, keys + (k, )): yield rv else: yield keys, d def create_object(settings): """Create and format objects. Parse dictionary file and return all objects and settings in the following format: {obj_name {settings}} """ bus = get_dbus() mapper = obmc.mapper.Mapper(bus) allobjects = {} queries = {} for compound_key, val in walk_nest(settings): obj_name = compound_key[0].lower() obj_name = obj_name.replace(".", "/") obj_name = "/" + obj_name + "0" for i in compound_key[2:len(compound_key) - 2]: obj_name = obj_name + "/" + i setting = compound_key[len(compound_key) - 2] attribute = compound_key[len(compound_key) - 1] if settings.get(compound_key[0], {}).get('query', {}): q = queries.setdefault(obj_name, {}) s = q.setdefault( setting, {'name': None, 'type': None, 'default': None}) else: o = allobjects.setdefault(obj_name, {}) s = o.setdefault( setting, {'name': None, 'type': None, 'default': None}) s[attribute] = val for settings in queries.itervalues(): for setting in settings.itervalues(): if setting['type'] is not 'instance_query': continue paths = mapper.get_subtree_paths(setting['subtree'], 0, retries=10, interval=0.1) if setting['keyregex'] == 'host': # Always create at least one host object. paths = set(paths + ['/org/openbmc/control/host0']) for path in paths: m = re.search(setting['matchregex'], path) if not m: continue allobjects.setdefault( "/org/openbmc/settings/" + m.group(1), settings) return allobjects class HostSettingsObject(DbusProperties, DbusObjectManager): def __init__(self, bus, name, settings, path): super(HostSettingsObject, self).__init__( conn=bus, object_path=name, validator=self.input_validator) self.bus = bus self.path = path self.name = name self.settings = settings self.fname = name[name.rfind("/") + 1:] + '-' # Needed to ignore the validation on default networkconfig values as # opposed to user giving the same. self.adminmode = True if not os.path.exists(path): os.mkdir(path) # Listen to changes in the property values and sync them to the BMC bus.add_signal_receiver( self.settings_signal_handler, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path=name) # Create the dbus properties for setting in settings.itervalues(): if setting['type'] is 'instance_query': continue self.set_settings_property( setting['name'], setting['type'], setting['default'], self.fname) # Done with consuming factory settings. self.adminmode = False def get_bmc_value(self, name, fname): try: with open(path.join(self.path, fname + name), 'r') as f: return f.read() except (IOError): pass return None # Create dbus properties based on bmc value. # This will be either a value previously set, # or the default file value if the BMC value # does not exist. def set_settings_property(self, attr_name, attr_type, value, fname): default = value # Read from file. bmcv = self.get_bmc_value(attr_name, fname) if bmcv: value = bmcv # Perform type mapping. type_map = {"i": int, "s": str, "b": bool}[attr_type] real_value = type_map(value) real_default = type_map(default) try: self.Set(DBUS_NAME, attr_name, real_value) except ValueError: print("Persistent value for {} is invalid: {}{} had '{}', " "using '{}'.".format(attr_name, fname, attr_name, value, default)) self.Set(DBUS_NAME, attr_name, real_default) self.set_system_settings(attr_name, real_default, fname) # Save the settings to the BMC. This will write the settings value in # individual files named by the property name to the BMC. def set_system_settings(self, name, value, fname): bmcv = self.get_bmc_value(name, fname) if bmcv != value: filepath = path.join(self.path, fname + name) with open(filepath, 'w') as f: f.write(str(value)) # Signal handler for when one ore more settings properties were updated. # This will sync the changes to the BMC. def settings_signal_handler( self, interface_name, changed_properties, invalidated_properties): for name, value in changed_properties.items(): self.set_system_settings(name, value, self.fname) # Placeholder signal. Needed to register the settings interface. @dbus.service.signal(DBUS_NAME, signature='s') def SettingsUpdated(self, sname): pass def validate_regex(self, regex, value): if not re.compile(regex).search(value): raise ValueError("Invalid input. Data does not satisfy regex") def validate_range(self, min, max, value): if value not in range(min, max): raise ValueError("Invalid input. Data not in allowed range") def validate_list(self, lst, value): if value not in map(lambda val: val, lst): raise ValueError("Invalid input. Data not in allowed values") # validate host network configuration # need "ipaddress=,prefix=,gateway=,mac=,addr_type=" # Must be able to handle any order def validate_net_config(self, value): if self.adminmode: return # Need all of these to be given by the user. user_config = [] all_config = ['ipaddress', 'prefix', 'gateway', 'mac', 'addr_type'] # This has a hard data format mentioned above so no blanks allowed. if value.count(" ") or value.count("=") != 5: raise ValueError("Invalid Network Data. No white spaces allowed") config = value.split(',') for key_value in config: key, value = key_value.split('=') if not key or not value: raise ValueError("Invalid key or Data") # Add the current key seen so we can compare at the end to see # if all values have been given user_config.append(key.lower()) if key.lower() == 'ipaddress' or key.lower() == 'gateway': IP(value) elif key.lower() == 'mac': regex = r'([a-fA-F0-9]{2}[:|\-]?){6}' self.validate_regex(regex, value) elif key.lower() == 'prefix': self.validate_range(0, 33, int(value)) elif key.lower() == 'addr_type': allowed = ["STATIC", "DYNAMIC"] self.validate_list(allowed, value) # Did user pass everything ?? if set(all_config) - set(user_config): raise ValueError( "Invalid Network Data. All information is mandatory") # Validate to see if the changes are in order def input_validator(self, iface, proprty, value): # User entered key is not present shk = None for attr in self.settings.itervalues(): if attr['name'] == proprty: shk = attr if shk is None: raise KeyError("Invalid Property") validation = shk.get('validation', None) if validation == 'list': self.validate_list(shk['allowed'], value) elif validation == 'range': self.validate_range(shk['min'], shk['max'] + 1, value) elif validation == 'regex': self.validate_regex(shk['regex'], value) elif validation == 'custom': getattr(self, shk['method'])(value) if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = get_dbus() allobjects = create_object(s.SETTINGS) lastobject = None objs = [] for o, settings in allobjects.iteritems(): objs.append(HostSettingsObject(bus, o, settings, "/var/lib/obmc/")) objs[-1].unmask_signals() mainloop = gobject.MainLoop() name = dbus.service.BusName(DBUS_NAME, bus) print "Running HostSettingsService" mainloop.run() # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4