From 2d1d68e3007f76d4c34ac07ae822935ab6021e79 Mon Sep 17 00:00:00 2001 From: Kelly Brazil Date: Tue, 12 Oct 2021 09:57:47 -0700 Subject: [PATCH] simplify populate_lists. Add CDC lists --- jc/parsers/lsusb.py | 175 ++++++++++++++++++++++---------------------- 1 file changed, 87 insertions(+), 88 deletions(-) diff --git a/jc/parsers/lsusb.py b/jc/parsers/lsusb.py index f820d8e3..a1d54a4d 100644 --- a/jc/parsers/lsusb.py +++ b/jc/parsers/lsusb.py @@ -238,61 +238,27 @@ class _LsUsb(): return True def _populate_lists(self, line): - if self.section == 'device_descriptor' and line.startswith(' '): - self.device_descriptor_list.append(self._add_attributes(line)) - return True + section_list_map = { + 'device_descriptor': self.device_descriptor_list, + 'configuration_descriptor': self.configuration_descriptor_list, + 'interface_association': self.interface_association_list, + 'interface_descriptor': self.interface_descriptor_list, + 'cdc_header': self.cdc_header_list, + 'cdc_call_management': self.cdc_call_management_list, + 'cdc_acm': self.cdc_acm_list, + 'cdc_union': self.cdc_union_list, + 'hid_device_descriptor': self.hid_device_descriptor_list, + 'report_descriptors': self.report_descriptors_list, + 'endpoint_descriptor': self.endpoint_descriptor_list, + 'hub_descriptor': self.hub_descriptor_list, + 'hub_port_status': self.hub_port_status_list, + 'device_status': self.device_status_list + } - if self.section == 'configuration_descriptor' and line.startswith(' '): - self.configuration_descriptor_list.append(self._add_attributes(line)) - return True - - if self.section == 'interface_association' and line.startswith(' '): - self.interface_association_list.append(self._add_attributes(line)) - return True - - if self.section == 'interface_descriptor' and line.startswith(' '): - self.interface_descriptor_list.append(self._add_attributes(line)) - return True - - if self.section == 'cdc_header' and line.startswith(' '): - self.cdc_header_list.append(self._add_attributes(line)) - return True - - if self.section == 'cdc_call_management' and line.startswith(' '): - self.cdc_call_management_list.append(self._add_attributes(line)) - return True - - if self.section == 'cdc_acm' and line.startswith(' '): - self.cdc_acm_list.append(self._add_attributes(line)) - return True - - if self.section == 'cdc_union' and line.startswith(' '): - self.cdc_union_list.append(self._add_attributes(line)) - return True - - if self.section == 'hid_device_descriptor' and line.startswith(' '): - self.report_descriptors_list.append(self._add_attributes(line)) - return True - - if self.section == 'report_descriptors' and line.startswith(' '): - self.report_descriptors_list.append(self._add_attributes(line)) - return True - - if self.section == 'endpoint_descriptor' and line.startswith(' '): - self.endpoint_descriptor_list.append(self._add_attributes(line)) - return True - - if self.section == 'hub_descriptor' and line.startswith(' '): - self.hub_descriptor_list.append(self._add_attributes(line)) - return True - - if self.section == 'hub_port_status' and line.startswith(' '): - self.hub_port_status_list.append(self._add_attributes(line)) - return True - - if self.section == 'device_status' and line.startswith(' '): - self.device_status_list.append(self._add_attributes(line)) - return True + for sec in section_list_map.keys(): + if line.startswith(' ') and self.section == sec: + section_list_map[self.section].append(self._add_attributes(line)) + return True def _populate_schema(self): """ @@ -305,7 +271,7 @@ class _LsUsb(): ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['attributes'] = {} ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_header'] = {} ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_call_management'] = {} - ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_adcm'] = {} + ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_acm'] = {} ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_union'] = {} ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor'] = {} ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor']['report_descriptors'] = {} @@ -325,7 +291,7 @@ class _LsUsb(): self.output_line.update(item) for dd in self.device_descriptor_list: - keyname = list(dd.keys())[0] + keyname = tuple(dd.keys())[0] if '_state' in dd[keyname] and dd[keyname]['_state']['bus_idx'] == idx: if 'device_descriptor' not in self.output_line: self.output_line['device_descriptor'] = {} @@ -333,7 +299,7 @@ class _LsUsb(): del self.output_line['device_descriptor'][keyname]['_state'] for cd in self.configuration_descriptor_list: - keyname = list(cd.keys())[0] + keyname = tuple(cd.keys())[0] if '_state' in cd[keyname] and cd[keyname]['_state']['bus_idx'] == idx: if 'configuration_descriptor' not in self.output_line['device_descriptor']: self.output_line['device_descriptor']['configuration_descriptor'] = {} @@ -341,7 +307,7 @@ class _LsUsb(): del self.output_line['device_descriptor']['configuration_descriptor'][keyname]['_state'] for ia in self.interface_association_list: - keyname = list(ia.keys())[0] + keyname = tuple(ia.keys())[0] if '_state' in ia[keyname] and ia[keyname]['_state']['bus_idx'] == idx: if 'interface_association' not in self.output_line['device_descriptor']['configuration_descriptor']: self.output_line['device_descriptor']['configuration_descriptor']['interface_association'] = {} @@ -351,7 +317,7 @@ class _LsUsb(): # add interface_descriptor key if it doesn't exist and there are entries for this bus if self.interface_descriptor_list: for iface_attrs in self.interface_descriptor_list: - keyname = list(iface_attrs.keys())[0] + keyname = tuple(iface_attrs.keys())[0] if '_state' in iface_attrs[keyname] and iface_attrs[keyname]['_state']['bus_idx'] == idx: if 'interface_descriptors' not in self.output_line['device_descriptor']['configuration_descriptor']: self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptors'] = [] @@ -359,7 +325,7 @@ class _LsUsb(): # find max index for this bus idx, then iterate over that range i_desc_iters = -1 for iface_attrs in self.interface_descriptor_list: - keyname = list(iface_attrs.keys())[0] + keyname = tuple(iface_attrs.keys())[0] if '_state' in iface_attrs[keyname] and iface_attrs[keyname]['_state']['bus_idx'] == idx: i_desc_iters = iface_attrs[keyname]['_state']['interface_descriptor_idx'] @@ -368,26 +334,59 @@ class _LsUsb(): for iface_idx in range(i_desc_iters + 1): i_desc_obj = {} for iface_attrs in self.interface_descriptor_list: - keyname = list(iface_attrs.keys())[0] + keyname = tuple(iface_attrs.keys())[0] if '_state' in iface_attrs[keyname] and iface_attrs[keyname]['_state']['bus_idx'] == idx and iface_attrs[keyname]['_state']['interface_descriptor_idx'] == iface_idx: del iface_attrs[keyname]['_state'] i_desc_obj.update(iface_attrs) + + # add other nodes to the object (cdc_header, endpoint descriptors, etc.) + for ch in self.cdc_header_list: + keyname = tuple(ch.keys())[0] + if '_state' in ch[keyname] and ch[keyname]['_state']['bus_idx'] == idx and ch[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + if 'cdc_header' not in i_desc_obj: + i_desc_obj['cdc_header'] = {} + i_desc_obj['cdc_header'].update(ch) + del i_desc_obj['cdc_header'][keyname]['_state'] + + for ccm in self.cdc_call_management_list: + keyname = tuple(ccm.keys())[0] + if '_state' in ccm[keyname] and ccm[keyname]['_state']['bus_idx'] == idx and ccm[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + if 'cdc_call_management' not in i_desc_obj: + i_desc_obj['cdc_call_management'] = {} + i_desc_obj['cdc_call_management'].update(ccm) + del i_desc_obj['cdc_call_management'][keyname]['_state'] + + for ca in self.cdc_acm_list: + keyname = tuple(ca.keys())[0] + if '_state' in ca[keyname] and ca[keyname]['_state']['bus_idx'] == idx and ca[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + if 'cdc_acm' not in i_desc_obj: + i_desc_obj['cdc_acm'] = {} + i_desc_obj['cdc_acm'].update(ca) + del i_desc_obj['cdc_acm'][keyname]['_state'] + + for cu in self.cdc_union_list: + keyname = tuple(cu.keys())[0] + if '_state' in cu[keyname] and cu[keyname]['_state']['bus_idx'] == idx and cu[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + if 'cdc_union' not in i_desc_obj: + i_desc_obj['cdc_union'] = {} + i_desc_obj['cdc_union'].update(cu) + del i_desc_obj['cdc_union'][keyname]['_state'] # add the object to the list of interface descriptors self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptors'].append(i_desc_obj) - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'] = [] - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_header'] = {} - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_call_management'] = {} - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_adcm'] = {} - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_union'] = {} - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor'] = {} - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor']['report_descriptors'] = {} - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'] = [] - # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'][0]['attributes'] = {} + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'] = [] + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_header'] = {} + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_call_management'] = {} + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_acm'] = {} + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_union'] = {} + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor'] = {} + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor']['report_descriptors'] = {} + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'] = [] + # ['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'][0]['attributes'] = {} - for endpoint_descriptor_idx in self.endpoint_descriptor_list: - pass + for endpoint_descriptor_idx in self.endpoint_descriptor_list: + pass def parse(data, raw=False, quiet=False): @@ -420,22 +419,22 @@ def parse(data, raw=False, quiet=False): continue # print(f''' -# # {s.section=} -# # {s.bus_list=} -# # {s.device_descriptor_list=} -# # {s.configuration_descriptor_list=} -# # {s.interface_association_list=} -# # {s.interface_descriptor_list=} -# # {s.cdc_header_list=} -# # {s.cdc_call_management_list=} -# # {s.cdc_acm_list=} -# # {s.cdc_union_list=} -# # {s.endpoint_descriptor_list=} -# # {s.hid_device_descriptor_list=} -# # {s.report_descriptors_list=} -# # {s.hub_descriptor_list=} -# # {s.hub_port_status_list=} -# # {s.device_status_list=} +# {s.section=} +# {s.bus_list=} +# {s.device_descriptor_list=} +# {s.configuration_descriptor_list=} +# {s.interface_association_list=} +# {s.interface_descriptor_list=} +# {s.cdc_header_list=} +# {s.cdc_call_management_list=} +# {s.cdc_acm_list=} +# {s.cdc_union_list=} +# {s.endpoint_descriptor_list=} +# {s.hid_device_descriptor_list=} +# {s.report_descriptors_list=} +# {s.hub_descriptor_list=} +# {s.hub_port_status_list=} +# {s.device_status_list=} # ''') # populate the schema