diff --git a/jc/parsers/lsusb.py b/jc/parsers/lsusb.py index 3df8674c..bafddd44 100644 --- a/jc/parsers/lsusb.py +++ b/jc/parsers/lsusb.py @@ -73,28 +73,344 @@ def _process(proc_data): return proc_data -def _count_indent(line): - indent = 0 - for char in line: - if char == ' ': - indent += 1 - continue - else: - break - return indent +class _LsUsb(): + def __init__(self): + self.raw_output = [] + self.output_line = {} + self.section = '' + self.old_section = '' + self.section_depth = 0 + self.old_section_depth = 0 + self.device_descriptor_list = [] + self.configuration_descriptor_list = [] + self.interface_association_list = [] + self.interface_descriptor_list = [] + self.cdc_header_list = [] + self.cdc_call_management_list = [] + self.cdc_acm_list = [] + self.cdc_union_list = [] + self.endpoint_descriptor_list = [] + self.hid_device_descriptor_list = [] + self.report_descriptors_list = [] + self.hub_descriptor_list = [] + self.hub_port_status_list = [] + @staticmethod + def _count_indent(line): + indent = 0 + for char in line: + if char == ' ': + indent += 1 + continue + else: + break + return indent -def _add_attributes(line): - indent = _count_indent(line) - # section header is formatted with the correct spacing to be used with jc.parsers.universal.sparse_table_parse() - # pad end of string to be at least len of 25 - section_header = 'key val description' + def _add_attributes(self, line): + indent = self._count_indent(line) + # Section header is formatted with the correct spacing to be used with + # jc.parsers.universal.sparse_table_parse(). Pad end of string to be at least len of 25 + section_header = 'key val description' - line_obj = [section_header, line.strip() + (' ' * 25)] - line_obj = sparse_table_parse(line_obj) - line_obj[0].update({'indent': indent}) + line_obj = [section_header, line.strip() + (' ' * 25)] + line_obj = sparse_table_parse(line_obj) + # line_obj[0].update({'indent': indent, 'depth': self.section_depth}) - return line_obj[0] + return line_obj[0] + + def _new_section(self, new_section): + self.old_section = self.section + self.section = new_section + + def _new_depth(self, new_depth): + self.old_section_depth = self.section_depth + self.section_depth = new_depth + + def _populate_schema(self, final=False): + """ + Append list entries and reset lists on section change + Depth assignments: + + Bus 0 + Device Descriptor: 2 + 3 + Configuration Descriptor: 4 + 5 + Interface Association: 6 + 7 + Interface Descriptor: 6 + 7 + CDC Header: 8 + 9 + CDC Call Management: 8 + 9 + CDC ACM: 8 + 9 + CDC Union: 8 + 9 + HID Device Descriptor: 10 + 11 + Report Descriptors: 12 + 13 + Endpoint Descriptor: 8 + 9 + Hub Descriptor: 2 + 3 + Hub Port Status: 4 + 5 + Device Status: 2 + 3 + """ + if (self.section != self.old_section and self.section_depth < self.old_section_depth) or final: + # in decending depth order + + if self.report_descriptors_list: + self.report_descriptors_list.append({'report_descriptor': self.report_descriptors_list}) + self.report_descriptors_list = [] + if not final: + return + + if self.hid_device_descriptor_list: + self.endpoint_descriptor_list.append({'hid_device_descriptor': self.hid_device_descriptor_list}) + self.hid_device_descriptor_list = [] + if not final: + return + + if self.cdc_header_list: + self.interface_descriptor_list.append({'cdc_header': self.cdc_header_list}) + self.cdc_header_list = [] + + if self.cdc_call_management_list: + self.interface_descriptor_list.append({'cdc_call_management': self.cdc_call_management_list}) + self.cdc_call_management_list = [] + + if self.cdc_acm_list: + self.interface_descriptor_list.append({'cdc_acm': self.cdc_acm_list}) + self.cdc_acm_list = [] + + if self.cdc_union_list: + self.interface_descriptor_list.append({'cdc_union': self.cdc_union_list}) + self.cdc_union_list = [] + + if self.endpoint_descriptor_list: + self.interface_descriptor_list.append({'endpoint_descriptor': self.endpoint_descriptor_list}) + self.endpoint_descriptor_list = [] + if not final: + return + + if self.interface_association_list: + self.output_line['device_descriptor']['configuration_descriptor']['interface_association']['attributes'].append(self.interface_association_list) + self.interface_association_list = [] + + if self.interface_descriptor_list: + if 'interface_descriptor_list' not in self.output_line['device_descriptor']['configuration_descriptor']: + self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'] = [] + self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'].append(self.interface_descriptor_list) + if not final: + return + + if self.configuration_descriptor_list: + self.output_line['device_descriptor']['configuration_descriptor']['attributes'].append(self.configuration_descriptor_list) + self.configuration_descriptor_list = [] + + if self.hub_port_status_list: + if 'hub_descriptor' not in self.output_line: + self.output_line['hub_descriptor'] = {} + self.output_line['hub_descriptor']['hub_port_status'] = [] + self.output_line['hub_descriptor']['hub_port_status'].append(self.hub_port_status_list) + self.hub_port_status_list = [] + if not final: + return + + if self.device_descriptor_list: + self.output_line['device_descriptor']['attributes'].append(self.device_descriptor_list) + self.device_descriptor_list = [] + + if self.hub_descriptor_list: + if 'hub_descriptor' not in self.output_line: + self.output_line['hub_descriptor'] = {} + if 'attributes' not in self.output_line['hub_descriptor']: + self.output_line['hub_descriptor']['attributes'] = [] + self.output_line['hub_descriptor']['attributes'].append(self.hub_descriptor_list) + self.hub_descriptor_list = [] + if not final: + return + + def _set_sections(self, line): + if line.startswith('Bus '): + self._new_section('bus') + self._new_depth(0) + + if self.output_line: + self.raw_output.append(self.output_line) + + self.output_line = {} + + line_split = line.strip().split(maxsplit=6) + self.output_line.update( + { + 'bus': line_split[1], + 'device': line_split[3][:-1], + 'id': line_split[5], + 'description': (line_split[6:7] or [None])[0] # way to get a list item or None + } + ) + return True + + if line.startswith('Device Descriptor:'): + self._new_section('device_descriptor') + self._new_depth(2) + return True + + if line.startswith(' Configuration Descriptor:'): + self._new_section('configuration_descriptor') + self._new_depth(4) + return True + + if line.startswith(' Interface Association:'): + self._new_section('interface_association') + self._new_depth(6) + return True + + if line.startswith(' Interface Descriptor:'): + self._new_section('interface_descriptor') + self._new_depth(6) + return True + + if line.startswith(' CDC Header:'): + self._new_section('cdc_header') + self._new_depth(8) + return True + + if line.startswith(' CDC Call Management:'): + self._new_section('cdc_call_management') + self._new_depth(8) + return True + + if line.startswith(' CDC ACM:'): + self._new_section('cdc_acm') + self._new_depth(8) + return True + + if line.startswith(' CDC Union:'): + self._new_section('cdc_union') + self._new_depth(8) + return True + + if line.startswith(' Endpoint Descriptor:'): + self._new_section('endpoint_descriptor') + self._new_depth(8) + return True + + if line.startswith(' HID Device Descriptor:'): + self._new_section('hid_device_descriptor') + self._new_depth(10) + return True + + if line.startswith(' Report Descriptors:'): + self._new_section('report_descriptors') + self._new_depth(12) + return True + + if line.startswith('Hub Descriptor:'): + self._new_section('hub_descriptor') + self._new_depth(2) + return True + + if line.startswith(' Hub Port Status:'): + self._new_section('hub_port_status') + self._new_depth(4) + return True + + if line.startswith('Device Status:'): + self._new_section('device_status') + self._new_depth(2) + + line_split = line.strip().split(':', maxsplit=1) + self.output_line.update( + { + 'device_status': + { + 'value': line_split[1].strip() + } + } + ) + return True + + def _populate_lists(self, line): + if self.section == 'device_descriptor' and line.startswith(' '): + self._new_depth(3) + self.device_descriptor_list.append(self._add_attributes(line)) + if 'device_descriptor' not in self.output_line: + self.output_line['device_descriptor'] = {} + self.output_line['device_descriptor']['attributes'] = [] + return True + + if self.section == 'configuration_descriptor' and line.startswith(' '): + self._new_depth(5) + self.configuration_descriptor_list.append(self._add_attributes(line)) + if 'configuration_descriptor' not in self.output_line['device_descriptor']: + self.output_line['device_descriptor']['configuration_descriptor'] = {} + self.output_line['device_descriptor']['configuration_descriptor']['attributes'] = [] + return True + + if self.section == 'interface_association' and line.startswith(' '): + self._new_depth(7) + self.interface_association_list.append(self._add_attributes(line)) + if 'interface_association' not in self.output_line['device_descriptor']['configuration_descriptor']: + self.output_line['device_descriptor']['configuration_descriptor']['interface_association'] = {} + self.output_line['device_descriptor']['configuration_descriptor']['interface_association']['attributes'] = [] + return True + + if self.section == 'interface_descriptor' and line.startswith(' '): + self._new_depth(7) + self.interface_descriptor_list.append(self._add_attributes(line)) + return True + + if self.section == 'cdc_header' and line.startswith(' '): + self._new_depth(9) + self.cdc_header_list.append(self._add_attributes(line)) + return True + + if self.section == 'cdc_call_management' and line.startswith(' '): + self._new_depth(9) + self.cdc_call_management_list.append(self._add_attributes(line)) + return True + + if self.section == 'cdc_acm' and line.startswith(' '): + self._new_depth(9) + self.cdc_acm_list.append(self._add_attributes(line)) + return True + + if self.section == 'cdc_union' and line.startswith(' '): + self._new_depth(9) + self.cdc_union_list.append(self._add_attributes(line)) + return True + + if self.section == 'hid_device_descriptor' and line.startswith(' '): + self._new_depth(11) + self.report_descriptors_list.append(self._add_attributes(line)) + return True + + if self.section == 'report_descriptors' and line.startswith(' '): + self._new_depth(13) + self.report_descriptors_list.append(self._add_attributes(line)) + return True + + if self.section == 'endpoint_descriptor' and line.startswith(' '): + self._new_depth(9) + self.endpoint_descriptor_list.append(self._add_attributes(line)) + return True + + if self.section == 'hub_descriptor' and line.startswith(' '): + self._new_depth(3) + self.hub_descriptor_list.append(self._add_attributes(line)) + return True + + if self.section == 'hub_port_status' and line.startswith(' '): + self._new_depth(5) + self.hub_port_status_list.append(self._add_attributes(line)) + return True def parse(data, raw=False, quiet=False): @@ -114,327 +430,34 @@ def parse(data, raw=False, quiet=False): if not quiet: jc.utils.compatibility(__name__, info.compatible) - raw_output = [] - output_line = {} - section = '' - device_descriptor_list = [] - configuration_descriptor_list = [] - interface_association_list = [] - interface_descriptor_list = [] - cdc_header_list = [] - cdc_call_management_list = [] - cdc_acm_list = [] - cdc_union_list = [] - endpoint_descriptor_list = [] - hid_device_descriptor_list = [] - report_descriptors_list = [] - hub_descriptor_list = [] - hub_port_status_list = [] - if jc.utils.has_data(data): + s = _LsUsb() for line in data.splitlines(): - # blank line: new object + print(f'section: {s.section}, depth: {s.section_depth}') + # populate schema when backing out of nodes + s._populate_schema() + + # ignore blank lines if not line: - if endpoint_descriptor_list: - interface_descriptor_list.append({'endpoint_descriptor': endpoint_descriptor_list}) - - if report_descriptors_list: - pass - - if output_line: - raw_output.append(output_line) - - output_line = {} - device_descriptor_list = [] - configuration_descriptor_list = [] - interface_association_list = [] - interface_descriptor_list = [] - cdc_header_list = [] - cdc_call_management_list = [] - cdc_acm_list = [] - cdc_union_list = [] - endpoint_descriptor_list = [] - hid_device_descriptor_list = [] - report_descriptors_list = [] - hub_descriptor_list = [] - hub_port_status_list = [] - section = '' + s._new_section('') + s._new_depth(0) continue # sections - if line.startswith('Bus '): - if output_line: - raw_output.append(output_line) - - output_line = {} - device_descriptor_list = [] - configuration_descriptor_list = [] - interface_association_list = [] - interface_descriptor_list = [] - cdc_header_list = [] - cdc_call_management_list = [] - cdc_acm_list = [] - cdc_union_list = [] - endpoint_descriptor_list = [] - hid_device_descriptor_list = [] - report_descriptors_list = [] - hub_descriptor_list = [] - hub_port_status_list = [] - section = 'bus' - - line_split = line.strip().split(maxsplit=6) - output_line.update( - { - 'bus': line_split[1], - 'device': line_split[3][:-1], - 'id': line_split[5], - 'description': (line_split[6:7] or [None])[0] # way to get a list item or None - } - ) + if s._set_sections(line): continue - - if line.startswith('Device Descriptor:'): - section = 'device_descriptor' - device_descriptor_list = [] - configuration_descriptor_list = [] - interface_association_list = [] - interface_descriptor_list = [] - cdc_header_list = [] - cdc_call_management_list = [] - cdc_acm_list = [] - cdc_union_list = [] - endpoint_descriptor_list = [] - hid_device_descriptor_list = [] - report_descriptors_list = [] - hub_descriptor_list = [] - hub_port_status_list = [] - continue - - if line.startswith(' Configuration Descriptor:'): - section = 'configuration_descriptor' - configuration_descriptor_list = [] - interface_descriptor_list = [] - continue - - if line.startswith(' Interface Association:'): - section = 'interface_association' - interface_association_list = [] - continue - - if line.startswith(' Interface Descriptor:'): - section = 'interface_descriptor' - - if cdc_header_list: - interface_descriptor_list.append({'cdc_header': cdc_header_list}) - - if cdc_call_management_list: - interface_descriptor_list.append({'cdc_call_management': cdc_call_management_list}) - - if cdc_acm_list: - interface_descriptor_list.append({'cdc_acm': cdc_acm_list}) - - if cdc_union_list: - interface_descriptor_list.append({'cdc_union': cdc_union_list}) - - if endpoint_descriptor_list: - interface_descriptor_list.append({'endpoint_descriptor': endpoint_descriptor_list}) - - if interface_descriptor_list: - if 'interface_descriptor' not in output_line['device_descriptor']['configuration_descriptor']: - output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'] = [] - output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'].append(interface_descriptor_list) - - cdc_header_list = [] - cdc_call_management_list = [] - cdc_acm_list = [] - cdc_union_list = [] - endpoint_descriptor_list = [] - interface_descriptor_list = [] - continue - - if line.startswith(' CDC Header:'): - section = 'cdc_header' - cdc_header_list = [] - continue - - if line.startswith(' CDC Call Management:'): - section = 'cdc_call_management' - cdc_call_management_list = [] - continue - - if line.startswith(' CDC ACM:'): - section = 'cdc_acm' - cdc_acm_list = [] - continue - - if line.startswith(' CDC Union:'): - section = 'cdc_union' - cdc_union_list = [] - continue - - if line.startswith(' Endpoint Descriptor:'): - section = 'endpoint_descriptor' - if endpoint_descriptor_list: - interface_descriptor_list.append({'endpoint_descriptor': endpoint_descriptor_list}) - endpoint_descriptor_list = [] - continue - - if line.startswith(' HID Device Descriptor:'): - section = 'hid_device_descriptor' - hid_device_descriptor_list = [] - continue - - if line.startswith(' Report Descriptors:'): - section = 'report_descriptors' - report_descriptors_list = [] - continue - - if line.startswith('Hub Descriptor:'): - section = 'hub_descriptor' - hub_descriptor_list = [] - continue - - if line.startswith(' Hub Port Status:'): - section = 'hub_port_status' - hub_port_status_list = [] - continue - - if line.startswith('Device Status:'): - section = 'device_status' - line_split = line.strip().split(':', maxsplit=1) - output_line.update( - { - 'device_status': - { - 'value': line_split[1].strip() - } - } - ) - continue - + # create section lists and schema - if section == 'device_descriptor' and line.startswith(' '): - device_descriptor_list.append(_add_attributes(line)) - if 'device_descriptor' not in output_line: - output_line['device_descriptor'] = {} - output_line['device_descriptor']['attributes'] = device_descriptor_list + if s._populate_lists(line): continue - - if section == 'configuration_descriptor' and line.startswith(' '): - configuration_descriptor_list.append(_add_attributes(line)) - if 'configuration_descriptor' not in output_line['device_descriptor']: - output_line['device_descriptor']['configuration_descriptor'] = {} - output_line['device_descriptor']['configuration_descriptor']['attributes'] = configuration_descriptor_list - continue - - if section == 'interface_association' and line.startswith(' '): - interface_association_list.append(_add_attributes(line)) - if 'interface_association' not in output_line['device_descriptor']['configuration_descriptor']: - output_line['device_descriptor']['configuration_descriptor']['interface_association'] = {} - output_line['device_descriptor']['configuration_descriptor']['interface_association']['attributes'] = interface_association_list - continue - - if section == 'report_descriptors' and line.startswith(' '): - report_descriptors_list.append(_add_attributes(line)) - continue - - if section == 'hid_device_descriptor' and line.startswith(' '): - if report_descriptors_list: - hid_device_descriptor_list.append({'report_descriptors': report_descriptors_list}) - - report_descriptors_list = [] - hid_device_descriptor_list.append(_add_attributes(line)) - continue - - if section == 'cdc_header' and line.startswith(' '): - cdc_header_list.append(_add_attributes(line)) - continue - - if section == 'cdc_call_management' and line.startswith(' '): - cdc_call_management_list.append(_add_attributes(line)) - continue - - if section == 'cdc_acm' and line.startswith(' '): - cdc_acm_list.append(_add_attributes(line)) - continue - - if section == 'cdc_union' and line.startswith(' '): - cdc_union_list.append(_add_attributes(line)) - continue - - if section == 'endpoint_descriptor' and line.startswith(' '): - if hid_device_descriptor_list: - endpoint_descriptor_list.append({'hid_device_descriptor': hid_device_descriptor_list}) - - hid_device_descriptor_list = [] - endpoint_descriptor_list.append(_add_attributes(line)) - continue - - if section == 'interface_descriptor' and line.startswith(' '): - interface_descriptor_list.append(_add_attributes(line)) - - if cdc_header_list: - interface_descriptor_list.append({'cdc_header': cdc_header_list}) - - if cdc_call_management_list: - interface_descriptor_list.append({'cdc_call_management': cdc_call_management_list}) - - if cdc_acm_list: - interface_descriptor_list.append({'cdc_acm': cdc_acm_list}) - - if cdc_union_list: - interface_descriptor_list.append({'cdc_union': cdc_union_list}) - - if endpoint_descriptor_list: - interface_descriptor_list.append({'endpoint_descriptor': endpoint_descriptor_list}) - - if interface_descriptor_list: - if 'interface_descriptor_list' not in output_line['device_descriptor']['configuration_descriptor']: - output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'] = [] - output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'].append(interface_descriptor_list) - - cdc_header_list = [] - cdc_call_management_list = [] - cdc_acm_list = [] - cdc_union_list = [] - endpoint_descriptor_list = [] - continue - - if section == 'hub_descriptor' and line.startswith(' '): - hub_descriptor_list.append(_add_attributes(line)) - if 'hub_descriptor' not in output_line: - output_line['hub_descriptor'] = {} - output_line['hub_descriptor']['attributes'] = hub_descriptor_list - continue - - if section == 'hub_port_status' and line.startswith(' '): - hub_port_status_list.append(_add_attributes(line)) - output_line['hub_descriptor']['hub_port_status'] = hub_port_status_list - continue - + # get final list entries - if cdc_header_list: - interface_descriptor_list.append({'cdc_header': cdc_header_list}) + s._new_depth(-1) + s._populate_schema(final=True) - if cdc_call_management_list: - interface_descriptor_list.append({'cdc_call_management': cdc_call_management_list}) + # output the raw object + if s.output_line: + s.raw_output.append(s.output_line) - if cdc_acm_list: - interface_descriptor_list.append({'cdc_acm': cdc_acm_list}) - - if cdc_union_list: - interface_descriptor_list.append({'cdc_union': cdc_union_list}) - - if endpoint_descriptor_list: - interface_descriptor_list.append({'endpoint_descriptor': endpoint_descriptor_list}) - - if interface_descriptor_list: - if 'interface_descriptor_list' not in output_line['device_descriptor']['configuration_descriptor']: - output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'] = [] - output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'].append(interface_descriptor_list) - - if output_line: - raw_output.append(output_line) - - return raw_output if raw else _process(raw_output) + return s.raw_output if raw else _process(s.raw_output)