From 2dbe56456bbea2bdfce7eacbf8b9f10a1b249f55 Mon Sep 17 00:00:00 2001 From: Kelly Brazil Date: Fri, 15 Oct 2021 11:43:15 -0700 Subject: [PATCH] add nested_dict. start work on hub_port_status and device_status --- jc/parsers/lsusb.py | 88 +++++++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/jc/parsers/lsusb.py b/jc/parsers/lsusb.py index 5e8baac4..c7339302 100644 --- a/jc/parsers/lsusb.py +++ b/jc/parsers/lsusb.py @@ -92,10 +92,8 @@ Schema: } }, "device_status": { - "attribute": { - "value": string, - "description": string - } + "value": string, + "description": string } } ] @@ -149,10 +147,19 @@ def _process(proc_data): return proc_data +class _NestedDict(dict): + # for ease of creating/updating nested dictionary structures + # https://stackoverflow.com/questions/5369723/multi-level-defaultdict-with-variable-depth + # https://ohuiginn.net/mt/2010/07/nested_dictionaries_in_python.html + def __getitem__(self, key): + if key in self: return self.get(key) + return self.setdefault(key, _NestedDict()) + + class _LsUsb(): def __init__(self): self.raw_output = [] - self.output_line = {} + self.output_line = _NestedDict() self.section = '' self.bus_idx = -1 @@ -218,6 +225,25 @@ class _LsUsb(): return line_obj + def _add_hub_port_status_attributes(self, line): + return self._add_attributes(line) + # indent = self._count_indent(line) + # line_obj = { + # temp_obj['key']: { + # 'value': temp_obj['val'], + # 'description': temp_obj['description'], + # '_state': { + # 'indent': indent, + # 'bus_idx': self.bus_idx, + # 'interface_descriptor_idx': self.interface_descriptor_idx, + # 'endpoint_descriptor_idx': self.endpoint_descriptor_idx + # } + # } + # } + + def _add_device_status_attributes(self, line): + return self._add_attributes(line) + def _set_sections(self, line): # ignore blank lines if not line: @@ -308,7 +334,6 @@ class _LsUsb(): 'report_descriptors': self.report_descriptors_list, 'endpoint_descriptor': self.endpoint_descriptor_list, 'hub_descriptor': self.hub_descriptor_list, - 'hub_port_status': self.hub_port_status_list, 'device_status': self.device_status_list } @@ -317,6 +342,15 @@ class _LsUsb(): section_list_map[self.section].append(self._add_attributes(line)) return True + # special handling of these sections + if line.startswith(' ') and self.section == 'hub_port_status': + self.hub_port_status_list.append(self._add_hub_port_status_attributes(line)) + return True + + if line.startswith(' ') and self.section == 'device_status': + self.device_status_list.append(self._add_device_status_attributes(line)) + return True + def _populate_schema(self): """ Schema: @@ -342,7 +376,7 @@ class _LsUsb(): if self.output_line: self.raw_output.append(self.output_line) - self.output_line = {} + self.output_line = _NestedDict() del item['_state'] self.output_line.update(item) @@ -350,24 +384,18 @@ class _LsUsb(): for dd in self.device_descriptor_list: keyname = tuple(dd.keys())[0] if '_state' in dd[keyname] and dd[keyname]['_state']['bus_idx'] == idx: - if 'device_descriptor' not in self.output_line: - self.output_line['device_descriptor'] = {} self.output_line['device_descriptor'].update(dd) del self.output_line['device_descriptor'][keyname]['_state'] for cd in self.configuration_descriptor_list: keyname = tuple(cd.keys())[0] if '_state' in cd[keyname] and cd[keyname]['_state']['bus_idx'] == idx: - if 'configuration_descriptor' not in self.output_line['device_descriptor']: - self.output_line['device_descriptor']['configuration_descriptor'] = {} self.output_line['device_descriptor']['configuration_descriptor'].update(cd) del self.output_line['device_descriptor']['configuration_descriptor'][keyname]['_state'] for ia in self.interface_association_list: keyname = tuple(ia.keys())[0] if '_state' in ia[keyname] and ia[keyname]['_state']['bus_idx'] == idx: - if 'interface_association' not in self.output_line['device_descriptor']['configuration_descriptor']: - self.output_line['device_descriptor']['configuration_descriptor']['interface_association'] = {} self.output_line['device_descriptor']['configuration_descriptor']['interface_association'].update(ia) del self.output_line['device_descriptor']['configuration_descriptor']['interface_association'][keyname]['_state'] @@ -388,7 +416,7 @@ class _LsUsb(): # create the interface descriptor object if i_desc_iters > -1: for iface_idx in range(i_desc_iters + 1): - i_desc_obj = {} + i_desc_obj = _NestedDict() for iface_attrs in self.interface_descriptor_list: keyname = tuple(iface_attrs.keys())[0] if '_state' in iface_attrs[keyname] and iface_attrs[keyname]['_state']['bus_idx'] == idx and iface_attrs[keyname]['_state']['interface_descriptor_idx'] == iface_idx: @@ -399,48 +427,36 @@ class _LsUsb(): for ch in self.cdc_header_list: keyname = tuple(ch.keys())[0] if '_state' in ch[keyname] and ch[keyname]['_state']['bus_idx'] == idx and ch[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - if 'cdc_header' not in i_desc_obj: - i_desc_obj['cdc_header'] = {} i_desc_obj['cdc_header'].update(ch) del i_desc_obj['cdc_header'][keyname]['_state'] for ccm in self.cdc_call_management_list: keyname = tuple(ccm.keys())[0] if '_state' in ccm[keyname] and ccm[keyname]['_state']['bus_idx'] == idx and ccm[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - if 'cdc_call_management' not in i_desc_obj: - i_desc_obj['cdc_call_management'] = {} i_desc_obj['cdc_call_management'].update(ccm) del i_desc_obj['cdc_call_management'][keyname]['_state'] for ca in self.cdc_acm_list: keyname = tuple(ca.keys())[0] if '_state' in ca[keyname] and ca[keyname]['_state']['bus_idx'] == idx and ca[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - if 'cdc_acm' not in i_desc_obj: - i_desc_obj['cdc_acm'] = {} i_desc_obj['cdc_acm'].update(ca) del i_desc_obj['cdc_acm'][keyname]['_state'] for cu in self.cdc_union_list: keyname = tuple(cu.keys())[0] if '_state' in cu[keyname] and cu[keyname]['_state']['bus_idx'] == idx and cu[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - if 'cdc_union' not in i_desc_obj: - i_desc_obj['cdc_union'] = {} i_desc_obj['cdc_union'].update(cu) del i_desc_obj['cdc_union'][keyname]['_state'] for hd in self.hid_device_descriptor_list: keyname = tuple(hd.keys())[0] if '_state' in hd[keyname] and hd[keyname]['_state']['bus_idx'] == idx and hd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - if 'hid_device_descriptor' not in i_desc_obj: - i_desc_obj['hid_device_descriptor'] = {} i_desc_obj['hid_device_descriptor'].update(hd) del i_desc_obj['hid_device_descriptor'][keyname]['_state'] for rd in self.report_descriptors_list: keyname = tuple(rd.keys())[0] if '_state' in rd[keyname] and rd[keyname]['_state']['bus_idx'] == idx and rd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - if 'report_descriptors' not in i_desc_obj['hid_device_descriptor']: - i_desc_obj['hid_device_descriptor']['report_descriptors'] = {} i_desc_obj['hid_device_descriptor']['report_descriptors'].update(rd) del i_desc_obj['hid_device_descriptor']['report_descriptors'][keyname]['_state'] @@ -476,24 +492,18 @@ class _LsUsb(): for hd in self.hub_descriptor_list: keyname = tuple(hd.keys())[0] if '_state' in hd[keyname] and hd[keyname]['_state']['bus_idx'] == idx: - if 'hub_descriptor' not in self.output_line: - self.output_line['hub_descriptor'] = {} self.output_line['hub_descriptor'].update(hd) del self.output_line['hub_descriptor'][keyname]['_state'] for hps in self.hub_port_status_list: keyname = tuple(hps.keys())[0] if '_state' in hps[keyname] and hps[keyname]['_state']['bus_idx'] == idx: - if 'hub_port_status' not in self.output_line['hub_descriptor']: - self.output_line['hub_descriptor']['hub_port_status'] = {} self.output_line['hub_descriptor']['hub_port_status'].update(hps) del self.output_line['hub_descriptor']['hub_port_status'][keyname]['_state'] for ds in self.device_status_list: keyname = tuple(ds.keys())[0] if '_state' in ds[keyname] and ds[keyname]['_state']['bus_idx'] == idx: - if 'device_status' not in self.output_line: - self.output_line['device_status'] = {} self.output_line['device_status'].update(ds) del self.output_line['device_status'][keyname]['_state'] @@ -527,6 +537,13 @@ def parse(data, raw=False, quiet=False): if s._populate_lists(line): continue + # populate the schema + s._populate_schema() + + # output the raw object + if s.output_line: + s.raw_output.append(s.output_line) + # print(f''' # {s.section=} # {s.bus_list=} @@ -546,11 +563,4 @@ def parse(data, raw=False, quiet=False): # {s.device_status_list=} # ''') - # populate the schema - s._populate_schema() - - # output the raw object - if s.output_line: - s.raw_output.append(s.output_line) - return s.raw_output if raw else _process(s.raw_output)