From 9ad0cd9dae8822235c37159541ce72471eb4263e Mon Sep 17 00:00:00 2001 From: Kelly Brazil Date: Sat, 9 Oct 2021 20:05:55 -0700 Subject: [PATCH] now storing state within the objects to make building the schema (later) easier. --- jc/parsers/lsusb.py | 321 ++++++++++++++++---------------------------- 1 file changed, 113 insertions(+), 208 deletions(-) diff --git a/jc/parsers/lsusb.py b/jc/parsers/lsusb.py index bafddd44..9b9d5ef8 100644 --- a/jc/parsers/lsusb.py +++ b/jc/parsers/lsusb.py @@ -35,6 +35,7 @@ Examples: """ import jc.utils from jc.parsers.universal import sparse_table_parse +from rich import print class info(): @@ -77,14 +78,17 @@ class _LsUsb(): def __init__(self): self.raw_output = [] self.output_line = {} + self.section = '' - self.old_section = '' - self.section_depth = 0 - self.old_section_depth = 0 + self.bus_idx = -1 + self.interface_descriptor_idx = -1 + self.endpoint_descriptor_idx = -1 + self.device_descriptor_list = [] self.configuration_descriptor_list = [] self.interface_association_list = [] self.interface_descriptor_list = [] + self.interface_descriptor_attribute_list = [] self.cdc_header_list = [] self.cdc_call_management_list = [] self.cdc_acm_list = [] @@ -94,6 +98,7 @@ class _LsUsb(): self.report_descriptors_list = [] self.hub_descriptor_list = [] self.hub_port_status_list = [] + self.device_status_list = [] @staticmethod def _count_indent(line): @@ -112,306 +117,202 @@ class _LsUsb(): # jc.parsers.universal.sparse_table_parse(). Pad end of string to be at least len of 25 section_header = 'key val description' - line_obj = [section_header, line.strip() + (' ' * 25)] - line_obj = sparse_table_parse(line_obj) - # line_obj[0].update({'indent': indent, 'depth': self.section_depth}) + temp_obj = [section_header, line.strip() + (' ' * 25)] + temp_obj = sparse_table_parse(temp_obj) + temp_obj = temp_obj[0] - return line_obj[0] + line_obj = { + temp_obj['key']: { + 'value': temp_obj['val'], + 'description': temp_obj['description'], + 'indent': indent, + 'bus_idx': self.bus_idx, + 'interface_descriptor_idx': self.interface_descriptor_idx, + 'endpoint_descriptor_idx': self.endpoint_descriptor_idx + } + } - def _new_section(self, new_section): - self.old_section = self.section - self.section = new_section + return line_obj - def _new_depth(self, new_depth): - self.old_section_depth = self.section_depth - self.section_depth = new_depth - - def _populate_schema(self, final=False): + def _populate_schema(self): """ - Append list entries and reset lists on section change - Depth assignments: - - Bus 0 - Device Descriptor: 2 - 3 - Configuration Descriptor: 4 - 5 - Interface Association: 6 - 7 - Interface Descriptor: 6 - 7 - CDC Header: 8 - 9 - CDC Call Management: 8 - 9 - CDC ACM: 8 - 9 - CDC Union: 8 - 9 - HID Device Descriptor: 10 - 11 - Report Descriptors: 12 - 13 - Endpoint Descriptor: 8 - 9 - Hub Descriptor: 2 - 3 - Hub Port Status: 4 - 5 - Device Status: 2 - 3 + Schema: + ['bus'] = {} + ['bus']['device_descriptor'] = {} + ['bus']['device_descriptor']['configuration_descriptor'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_association'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptor'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'] = [] + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['attributes'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_header'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_call_management'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_adcm'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_union'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor']['report_descriptors'] = {} + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'] = [] + ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'][0]['attributes'] = {} + ['bus']['hub_descriptor'] = {} + ['bus']['hub_descriptor']['hub_port_status'] = {} + ['bus']['device_status'] = {} """ - if (self.section != self.old_section and self.section_depth < self.old_section_depth) or final: - # in decending depth order + pass + # if self.output_line: + # self.raw_output.append(self.output_line) - if self.report_descriptors_list: - self.report_descriptors_list.append({'report_descriptor': self.report_descriptors_list}) - self.report_descriptors_list = [] - if not final: - return + # self.output_line = {} + # # self._reset_lists() + # self.bus_idx += 1 - if self.hid_device_descriptor_list: - self.endpoint_descriptor_list.append({'hid_device_descriptor': self.hid_device_descriptor_list}) - self.hid_device_descriptor_list = [] - if not final: - return + # line_split = line.strip().split(maxsplit=6) + # self.output_line.update( + # { + # 'bus': line_split[1], + # 'device': line_split[3][:-1], + # 'id': line_split[5], + # 'description': (line_split[6:7] or [None])[0] # way to get a list item or None + # } + # ) - if self.cdc_header_list: - self.interface_descriptor_list.append({'cdc_header': self.cdc_header_list}) - self.cdc_header_list = [] - - if self.cdc_call_management_list: - self.interface_descriptor_list.append({'cdc_call_management': self.cdc_call_management_list}) - self.cdc_call_management_list = [] - - if self.cdc_acm_list: - self.interface_descriptor_list.append({'cdc_acm': self.cdc_acm_list}) - self.cdc_acm_list = [] - - if self.cdc_union_list: - self.interface_descriptor_list.append({'cdc_union': self.cdc_union_list}) - self.cdc_union_list = [] - - if self.endpoint_descriptor_list: - self.interface_descriptor_list.append({'endpoint_descriptor': self.endpoint_descriptor_list}) - self.endpoint_descriptor_list = [] - if not final: - return - - if self.interface_association_list: - self.output_line['device_descriptor']['configuration_descriptor']['interface_association']['attributes'].append(self.interface_association_list) - self.interface_association_list = [] - - if self.interface_descriptor_list: - if 'interface_descriptor_list' not in self.output_line['device_descriptor']['configuration_descriptor']: - self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'] = [] - self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'].append(self.interface_descriptor_list) - if not final: - return - - if self.configuration_descriptor_list: - self.output_line['device_descriptor']['configuration_descriptor']['attributes'].append(self.configuration_descriptor_list) - self.configuration_descriptor_list = [] - - if self.hub_port_status_list: - if 'hub_descriptor' not in self.output_line: - self.output_line['hub_descriptor'] = {} - self.output_line['hub_descriptor']['hub_port_status'] = [] - self.output_line['hub_descriptor']['hub_port_status'].append(self.hub_port_status_list) - self.hub_port_status_list = [] - if not final: - return - - if self.device_descriptor_list: - self.output_line['device_descriptor']['attributes'].append(self.device_descriptor_list) - self.device_descriptor_list = [] - - if self.hub_descriptor_list: - if 'hub_descriptor' not in self.output_line: - self.output_line['hub_descriptor'] = {} - if 'attributes' not in self.output_line['hub_descriptor']: - self.output_line['hub_descriptor']['attributes'] = [] - self.output_line['hub_descriptor']['attributes'].append(self.hub_descriptor_list) - self.hub_descriptor_list = [] - if not final: - return + # line_split = line.strip().split(':', maxsplit=1) + # self.output_line.update( + # { + # 'device_status': + # { + # 'value': line_split[1].strip() + # } + # } + # ) def _set_sections(self, line): + # ignore blank lines + if not line: + self.section = '' + return True + if line.startswith('Bus '): - self._new_section('bus') - self._new_depth(0) - - if self.output_line: - self.raw_output.append(self.output_line) - - self.output_line = {} - - line_split = line.strip().split(maxsplit=6) - self.output_line.update( - { - 'bus': line_split[1], - 'device': line_split[3][:-1], - 'id': line_split[5], - 'description': (line_split[6:7] or [None])[0] # way to get a list item or None - } - ) + self.section = 'bus' + self.bus_idx += 1 + self.interface_descriptor_idx = -1 + self.endpoint_descriptor_idx = -1 return True if line.startswith('Device Descriptor:'): - self._new_section('device_descriptor') - self._new_depth(2) + self.section = 'device_descriptor' return True if line.startswith(' Configuration Descriptor:'): - self._new_section('configuration_descriptor') - self._new_depth(4) + self.section = 'configuration_descriptor' return True if line.startswith(' Interface Association:'): - self._new_section('interface_association') - self._new_depth(6) + self.section = 'interface_association' return True if line.startswith(' Interface Descriptor:'): - self._new_section('interface_descriptor') - self._new_depth(6) + self.section = 'interface_descriptor' + self.interface_descriptor_idx += 1 + self.endpoint_descriptor_idx = -1 return True if line.startswith(' CDC Header:'): - self._new_section('cdc_header') - self._new_depth(8) + self.section = 'cdc_header' return True if line.startswith(' CDC Call Management:'): - self._new_section('cdc_call_management') - self._new_depth(8) + self.section = 'cdc_call_management' return True if line.startswith(' CDC ACM:'): - self._new_section('cdc_acm') - self._new_depth(8) + self.section = 'cdc_acm' return True if line.startswith(' CDC Union:'): - self._new_section('cdc_union') - self._new_depth(8) + self.section = 'cdc_union' return True if line.startswith(' Endpoint Descriptor:'): - self._new_section('endpoint_descriptor') - self._new_depth(8) + self.section = 'endpoint_descriptor' + self.endpoint_descriptor_idx += 1 return True if line.startswith(' HID Device Descriptor:'): - self._new_section('hid_device_descriptor') - self._new_depth(10) + self.section = 'hid_device_descriptor' return True if line.startswith(' Report Descriptors:'): - self._new_section('report_descriptors') - self._new_depth(12) + self.section = 'report_descriptors' return True if line.startswith('Hub Descriptor:'): - self._new_section('hub_descriptor') - self._new_depth(2) + self.section = 'hub_descriptor' return True if line.startswith(' Hub Port Status:'): - self._new_section('hub_port_status') - self._new_depth(4) + self.section = 'hub_port_status' return True if line.startswith('Device Status:'): - self._new_section('device_status') - self._new_depth(2) - - line_split = line.strip().split(':', maxsplit=1) - self.output_line.update( - { - 'device_status': - { - 'value': line_split[1].strip() - } - } - ) + self.section = 'device_status' return True def _populate_lists(self, line): if self.section == 'device_descriptor' and line.startswith(' '): - self._new_depth(3) self.device_descriptor_list.append(self._add_attributes(line)) - if 'device_descriptor' not in self.output_line: - self.output_line['device_descriptor'] = {} - self.output_line['device_descriptor']['attributes'] = [] return True if self.section == 'configuration_descriptor' and line.startswith(' '): - self._new_depth(5) self.configuration_descriptor_list.append(self._add_attributes(line)) - if 'configuration_descriptor' not in self.output_line['device_descriptor']: - self.output_line['device_descriptor']['configuration_descriptor'] = {} - self.output_line['device_descriptor']['configuration_descriptor']['attributes'] = [] return True if self.section == 'interface_association' and line.startswith(' '): - self._new_depth(7) self.interface_association_list.append(self._add_attributes(line)) - if 'interface_association' not in self.output_line['device_descriptor']['configuration_descriptor']: - self.output_line['device_descriptor']['configuration_descriptor']['interface_association'] = {} - self.output_line['device_descriptor']['configuration_descriptor']['interface_association']['attributes'] = [] return True if self.section == 'interface_descriptor' and line.startswith(' '): - self._new_depth(7) self.interface_descriptor_list.append(self._add_attributes(line)) return True if self.section == 'cdc_header' and line.startswith(' '): - self._new_depth(9) self.cdc_header_list.append(self._add_attributes(line)) return True if self.section == 'cdc_call_management' and line.startswith(' '): - self._new_depth(9) self.cdc_call_management_list.append(self._add_attributes(line)) return True if self.section == 'cdc_acm' and line.startswith(' '): - self._new_depth(9) self.cdc_acm_list.append(self._add_attributes(line)) return True if self.section == 'cdc_union' and line.startswith(' '): - self._new_depth(9) self.cdc_union_list.append(self._add_attributes(line)) return True if self.section == 'hid_device_descriptor' and line.startswith(' '): - self._new_depth(11) self.report_descriptors_list.append(self._add_attributes(line)) return True if self.section == 'report_descriptors' and line.startswith(' '): - self._new_depth(13) self.report_descriptors_list.append(self._add_attributes(line)) return True if self.section == 'endpoint_descriptor' and line.startswith(' '): - self._new_depth(9) self.endpoint_descriptor_list.append(self._add_attributes(line)) return True if self.section == 'hub_descriptor' and line.startswith(' '): - self._new_depth(3) self.hub_descriptor_list.append(self._add_attributes(line)) return True if self.section == 'hub_port_status' and line.startswith(' '): - self._new_depth(5) self.hub_port_status_list.append(self._add_attributes(line)) return True + if self.section == 'device_status' and line.startswith(' '): + self.device_status_list.append(self._add_attributes(line)) + return True + def parse(data, raw=False, quiet=False): """ @@ -434,15 +335,23 @@ def parse(data, raw=False, quiet=False): s = _LsUsb() for line in data.splitlines(): - print(f'section: {s.section}, depth: {s.section_depth}') - # populate schema when backing out of nodes - s._populate_schema() - - # ignore blank lines - if not line: - s._new_section('') - s._new_depth(0) - continue + print(f''' +{s.section=} +{s.device_descriptor_list=} +{s.configuration_descriptor_list=} +{s.interface_association_list=} +{s.interface_descriptor_list=} +{s.cdc_header_list=} +{s.cdc_call_management_list=} +{s.cdc_acm_list=} +{s.cdc_union_list=} +{s.endpoint_descriptor_list=} +{s.hid_device_descriptor_list=} +{s.report_descriptors_list=} +{s.hub_descriptor_list=} +{s.hub_port_status_list=} +{s.device_status_list=} +''') # sections if s._set_sections(line): @@ -451,10 +360,6 @@ def parse(data, raw=False, quiet=False): # create section lists and schema if s._populate_lists(line): continue - - # get final list entries - s._new_depth(-1) - s._populate_schema(final=True) # output the raw object if s.output_line: