From 0bdef95d3fe34e0ca6ce47beca8bfd7635dff77b Mon Sep 17 00:00:00 2001 From: Leonel Gonzalez Date: Tue, 18 Apr 2017 08:17:49 -0500 Subject: Converts value to expected type Fixes openbmc/openbmc#1160 Change-Id: I5c36f5bc065a0b2fdac56c2ee92b0ed54258f238 Signed-off-by: Leonel Gonzalez --- module/obmc/wsgi/apps/rest_dbus.py | 95 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) (limited to 'module') diff --git a/module/obmc/wsgi/apps/rest_dbus.py b/module/obmc/wsgi/apps/rest_dbus.py index c51e4f9..605956c 100644 --- a/module/obmc/wsgi/apps/rest_dbus.py +++ b/module/obmc/wsgi/apps/rest_dbus.py @@ -27,6 +27,7 @@ import spwd import grp import crypt import tempfile +import re DBUS_UNKNOWN_INTERFACE = 'org.freedesktop.UnknownInterface' DBUS_UNKNOWN_INTERFACE_ERROR = 'org.freedesktop.DBus.Error.UnknownInterface' @@ -45,6 +46,85 @@ def valid_user(session, *a, **kw): abort(401, 'Login required') +def get_type_signature_by_introspection(bus, service, object_path, + property_name): + obj = bus.get_object(service, object_path) + iface = dbus.Interface(obj, 'org.freedesktop.DBus.Introspectable') + xml_string = iface.Introspect() + for child in ElementTree.fromstring(xml_string): + # Iterate over each interfaces's properties to find + # matching property_name, and return its signature string + if child.tag == 'interface': + for i in child.iter(): + if ('name' in i.attrib) and \ + (i.attrib['name'] == property_name): + type_signature = i.attrib['type'] + return type_signature + + +def split_struct_signature(signature): + struct_regex = r'(b|y|n|i|x|q|u|t|d|s|a\(.+?\)|\(.+?\))|a\{.+?\}+?' + struct_matches = re.findall(struct_regex, signature) + return struct_matches + + +def convert_type(signature, value): + # Basic Types + converted_value = None + converted_container = None + basic_types = {'b': bool, 'y': dbus.Byte, 'n': dbus.Int16, 'i': int, + 'x': long, 'q': dbus.UInt16, 'u': dbus.UInt32, + 't': dbus.UInt64, 'd': float, 's': str} + array_matches = re.match(r'a\((\S+)\)', signature) + struct_matches = re.match(r'\((\S+)\)', signature) + dictionary_matches = re.match(r'a{(\S+)}', signature) + if signature in basic_types: + converted_value = basic_types[signature](value) + return converted_value + # Array + if array_matches: + element_type = array_matches.group(1) + converted_container = list() + # Test if value is a list + # to avoid iterating over each character in a string. + # Iterate over each item and convert type + if isinstance(value, list): + for i in value: + converted_element = convert_type(element_type, i) + converted_container.append(converted_element) + # Convert non-sequence to expected type, and append to list + else: + converted_element = convert_type(element_type, value) + converted_container.append(converted_element) + return converted_container + # Struct + if struct_matches: + element_types = struct_matches.group(1) + split_element_types = split_struct_signature(element_types) + converted_container = list() + # Test if value is a list + if isinstance(value, list): + for index, val in enumerate(value): + converted_element = convert_type(split_element_types[index], + value[index]) + converted_container.append(converted_element) + else: + converted_element = convert_type(element_types, value) + converted_container.append(converted_element) + return tuple(converted_container) + # Dictionary + if dictionary_matches: + element_types = dictionary_matches.group(1) + split_element_types = split_struct_signature(element_types) + converted_container = dict() + # Convert each element of dict + for key, val in value.iteritems(): + converted_key = convert_type(split_element_types[0], key) + converted_val = convert_type(split_element_types[1], val) + converted_container[converted_key] = converted_val + return converted_container + + class UserInGroup: ''' Authorization plugin callback that checks that the user is logged in and a member of a group. ''' @@ -313,6 +393,21 @@ class PropertyHandler(RouteHandler): abort(400, str(e)) except dbus.exceptions.DBusException, e: if e.get_dbus_name() == DBUS_INVALID_ARGS: + bus_name = properties_iface.bus_name + expected_type = get_type_signature_by_introspection(self.bus, + bus_name, + path, + prop) + if not expected_type: + abort(403, "Failed to get expected type: %s" % str(e)) + converted_value = None + try: + converted_value = convert_type(expected_type, value) + self.do_put(path, prop, converted_value) + return + except Exception as ex: + abort(403, "Failed to convert %s to type %s" % + (value, expected_type)) abort(403, str(e)) raise -- cgit v1.2.1