1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-23 00:29:59 +02:00

now storing state within the objects to make building the schema (later) easier.

This commit is contained in:
Kelly Brazil
2021-10-09 20:05:55 -07:00
parent 6d4a469127
commit 9ad0cd9dae

View File

@ -35,6 +35,7 @@ Examples:
""" """
import jc.utils import jc.utils
from jc.parsers.universal import sparse_table_parse from jc.parsers.universal import sparse_table_parse
from rich import print
class info(): class info():
@ -77,14 +78,17 @@ class _LsUsb():
def __init__(self): def __init__(self):
self.raw_output = [] self.raw_output = []
self.output_line = {} self.output_line = {}
self.section = '' self.section = ''
self.old_section = '' self.bus_idx = -1
self.section_depth = 0 self.interface_descriptor_idx = -1
self.old_section_depth = 0 self.endpoint_descriptor_idx = -1
self.device_descriptor_list = [] self.device_descriptor_list = []
self.configuration_descriptor_list = [] self.configuration_descriptor_list = []
self.interface_association_list = [] self.interface_association_list = []
self.interface_descriptor_list = [] self.interface_descriptor_list = []
self.interface_descriptor_attribute_list = []
self.cdc_header_list = [] self.cdc_header_list = []
self.cdc_call_management_list = [] self.cdc_call_management_list = []
self.cdc_acm_list = [] self.cdc_acm_list = []
@ -94,6 +98,7 @@ class _LsUsb():
self.report_descriptors_list = [] self.report_descriptors_list = []
self.hub_descriptor_list = [] self.hub_descriptor_list = []
self.hub_port_status_list = [] self.hub_port_status_list = []
self.device_status_list = []
@staticmethod @staticmethod
def _count_indent(line): def _count_indent(line):
@ -112,306 +117,202 @@ class _LsUsb():
# jc.parsers.universal.sparse_table_parse(). Pad end of string to be at least len of 25 # jc.parsers.universal.sparse_table_parse(). Pad end of string to be at least len of 25
section_header = 'key val description' section_header = 'key val description'
line_obj = [section_header, line.strip() + (' ' * 25)] temp_obj = [section_header, line.strip() + (' ' * 25)]
line_obj = sparse_table_parse(line_obj) temp_obj = sparse_table_parse(temp_obj)
# line_obj[0].update({'indent': indent, 'depth': self.section_depth}) temp_obj = temp_obj[0]
return line_obj[0] line_obj = {
temp_obj['key']: {
'value': temp_obj['val'],
'description': temp_obj['description'],
'indent': indent,
'bus_idx': self.bus_idx,
'interface_descriptor_idx': self.interface_descriptor_idx,
'endpoint_descriptor_idx': self.endpoint_descriptor_idx
}
}
def _new_section(self, new_section): return line_obj
self.old_section = self.section
self.section = new_section
def _new_depth(self, new_depth): def _populate_schema(self):
self.old_section_depth = self.section_depth
self.section_depth = new_depth
def _populate_schema(self, final=False):
""" """
Append list entries and reset lists on section change Schema:
Depth assignments: ['bus'] = {}
['bus']['device_descriptor'] = {}
Bus 0 ['bus']['device_descriptor']['configuration_descriptor'] = {}
Device Descriptor: 2 ['bus']['device_descriptor']['configuration_descriptor']['interface_association'] = {}
<attrs> 3 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptor'] = {}
Configuration Descriptor: 4 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'] = []
<attrs> 5 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['attributes'] = {}
Interface Association: 6 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_header'] = {}
<attrs> 7 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_call_management'] = {}
Interface Descriptor: 6 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_adcm'] = {}
<attrs> 7 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_union'] = {}
CDC Header: 8 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor'] = {}
<attrs> 9 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['hid_device_descriptor']['report_descriptors'] = {}
CDC Call Management: 8 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'] = []
<attrs> 9 ['bus']['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['endpoint_descriptors'][0]['attributes'] = {}
CDC ACM: 8 ['bus']['hub_descriptor'] = {}
<attrs> 9 ['bus']['hub_descriptor']['hub_port_status'] = {}
CDC Union: 8 ['bus']['device_status'] = {}
<attrs> 9
HID Device Descriptor: 10
<attrs> 11
Report Descriptors: 12
<attrs> 13
Endpoint Descriptor: 8
<attrs> 9
Hub Descriptor: 2
<attrs> 3
Hub Port Status: 4
<attrs> 5
Device Status: 2
<attrs> 3
""" """
if (self.section != self.old_section and self.section_depth < self.old_section_depth) or final: pass
# in decending depth order # if self.output_line:
# self.raw_output.append(self.output_line)
if self.report_descriptors_list: # self.output_line = {}
self.report_descriptors_list.append({'report_descriptor': self.report_descriptors_list}) # # self._reset_lists()
self.report_descriptors_list = [] # self.bus_idx += 1
if not final:
return
if self.hid_device_descriptor_list: # line_split = line.strip().split(maxsplit=6)
self.endpoint_descriptor_list.append({'hid_device_descriptor': self.hid_device_descriptor_list}) # self.output_line.update(
self.hid_device_descriptor_list = [] # {
if not final: # 'bus': line_split[1],
return # 'device': line_split[3][:-1],
# 'id': line_split[5],
# 'description': (line_split[6:7] or [None])[0] # way to get a list item or None
# }
# )
if self.cdc_header_list: # line_split = line.strip().split(':', maxsplit=1)
self.interface_descriptor_list.append({'cdc_header': self.cdc_header_list}) # self.output_line.update(
self.cdc_header_list = [] # {
# 'device_status':
if self.cdc_call_management_list: # {
self.interface_descriptor_list.append({'cdc_call_management': self.cdc_call_management_list}) # 'value': line_split[1].strip()
self.cdc_call_management_list = [] # }
# }
if self.cdc_acm_list: # )
self.interface_descriptor_list.append({'cdc_acm': self.cdc_acm_list})
self.cdc_acm_list = []
if self.cdc_union_list:
self.interface_descriptor_list.append({'cdc_union': self.cdc_union_list})
self.cdc_union_list = []
if self.endpoint_descriptor_list:
self.interface_descriptor_list.append({'endpoint_descriptor': self.endpoint_descriptor_list})
self.endpoint_descriptor_list = []
if not final:
return
if self.interface_association_list:
self.output_line['device_descriptor']['configuration_descriptor']['interface_association']['attributes'].append(self.interface_association_list)
self.interface_association_list = []
if self.interface_descriptor_list:
if 'interface_descriptor_list' not in self.output_line['device_descriptor']['configuration_descriptor']:
self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'] = []
self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptor'].append(self.interface_descriptor_list)
if not final:
return
if self.configuration_descriptor_list:
self.output_line['device_descriptor']['configuration_descriptor']['attributes'].append(self.configuration_descriptor_list)
self.configuration_descriptor_list = []
if self.hub_port_status_list:
if 'hub_descriptor' not in self.output_line:
self.output_line['hub_descriptor'] = {}
self.output_line['hub_descriptor']['hub_port_status'] = []
self.output_line['hub_descriptor']['hub_port_status'].append(self.hub_port_status_list)
self.hub_port_status_list = []
if not final:
return
if self.device_descriptor_list:
self.output_line['device_descriptor']['attributes'].append(self.device_descriptor_list)
self.device_descriptor_list = []
if self.hub_descriptor_list:
if 'hub_descriptor' not in self.output_line:
self.output_line['hub_descriptor'] = {}
if 'attributes' not in self.output_line['hub_descriptor']:
self.output_line['hub_descriptor']['attributes'] = []
self.output_line['hub_descriptor']['attributes'].append(self.hub_descriptor_list)
self.hub_descriptor_list = []
if not final:
return
def _set_sections(self, line): def _set_sections(self, line):
# ignore blank lines
if not line:
self.section = ''
return True
if line.startswith('Bus '): if line.startswith('Bus '):
self._new_section('bus') self.section = 'bus'
self._new_depth(0) self.bus_idx += 1
self.interface_descriptor_idx = -1
if self.output_line: self.endpoint_descriptor_idx = -1
self.raw_output.append(self.output_line)
self.output_line = {}
line_split = line.strip().split(maxsplit=6)
self.output_line.update(
{
'bus': line_split[1],
'device': line_split[3][:-1],
'id': line_split[5],
'description': (line_split[6:7] or [None])[0] # way to get a list item or None
}
)
return True return True
if line.startswith('Device Descriptor:'): if line.startswith('Device Descriptor:'):
self._new_section('device_descriptor') self.section = 'device_descriptor'
self._new_depth(2)
return True return True
if line.startswith(' Configuration Descriptor:'): if line.startswith(' Configuration Descriptor:'):
self._new_section('configuration_descriptor') self.section = 'configuration_descriptor'
self._new_depth(4)
return True return True
if line.startswith(' Interface Association:'): if line.startswith(' Interface Association:'):
self._new_section('interface_association') self.section = 'interface_association'
self._new_depth(6)
return True return True
if line.startswith(' Interface Descriptor:'): if line.startswith(' Interface Descriptor:'):
self._new_section('interface_descriptor') self.section = 'interface_descriptor'
self._new_depth(6) self.interface_descriptor_idx += 1
self.endpoint_descriptor_idx = -1
return True return True
if line.startswith(' CDC Header:'): if line.startswith(' CDC Header:'):
self._new_section('cdc_header') self.section = 'cdc_header'
self._new_depth(8)
return True return True
if line.startswith(' CDC Call Management:'): if line.startswith(' CDC Call Management:'):
self._new_section('cdc_call_management') self.section = 'cdc_call_management'
self._new_depth(8)
return True return True
if line.startswith(' CDC ACM:'): if line.startswith(' CDC ACM:'):
self._new_section('cdc_acm') self.section = 'cdc_acm'
self._new_depth(8)
return True return True
if line.startswith(' CDC Union:'): if line.startswith(' CDC Union:'):
self._new_section('cdc_union') self.section = 'cdc_union'
self._new_depth(8)
return True return True
if line.startswith(' Endpoint Descriptor:'): if line.startswith(' Endpoint Descriptor:'):
self._new_section('endpoint_descriptor') self.section = 'endpoint_descriptor'
self._new_depth(8) self.endpoint_descriptor_idx += 1
return True return True
if line.startswith(' HID Device Descriptor:'): if line.startswith(' HID Device Descriptor:'):
self._new_section('hid_device_descriptor') self.section = 'hid_device_descriptor'
self._new_depth(10)
return True return True
if line.startswith(' Report Descriptors:'): if line.startswith(' Report Descriptors:'):
self._new_section('report_descriptors') self.section = 'report_descriptors'
self._new_depth(12)
return True return True
if line.startswith('Hub Descriptor:'): if line.startswith('Hub Descriptor:'):
self._new_section('hub_descriptor') self.section = 'hub_descriptor'
self._new_depth(2)
return True return True
if line.startswith(' Hub Port Status:'): if line.startswith(' Hub Port Status:'):
self._new_section('hub_port_status') self.section = 'hub_port_status'
self._new_depth(4)
return True return True
if line.startswith('Device Status:'): if line.startswith('Device Status:'):
self._new_section('device_status') self.section = 'device_status'
self._new_depth(2)
line_split = line.strip().split(':', maxsplit=1)
self.output_line.update(
{
'device_status':
{
'value': line_split[1].strip()
}
}
)
return True return True
def _populate_lists(self, line): def _populate_lists(self, line):
if self.section == 'device_descriptor' and line.startswith(' '): if self.section == 'device_descriptor' and line.startswith(' '):
self._new_depth(3)
self.device_descriptor_list.append(self._add_attributes(line)) self.device_descriptor_list.append(self._add_attributes(line))
if 'device_descriptor' not in self.output_line:
self.output_line['device_descriptor'] = {}
self.output_line['device_descriptor']['attributes'] = []
return True return True
if self.section == 'configuration_descriptor' and line.startswith(' '): if self.section == 'configuration_descriptor' and line.startswith(' '):
self._new_depth(5)
self.configuration_descriptor_list.append(self._add_attributes(line)) self.configuration_descriptor_list.append(self._add_attributes(line))
if 'configuration_descriptor' not in self.output_line['device_descriptor']:
self.output_line['device_descriptor']['configuration_descriptor'] = {}
self.output_line['device_descriptor']['configuration_descriptor']['attributes'] = []
return True return True
if self.section == 'interface_association' and line.startswith(' '): if self.section == 'interface_association' and line.startswith(' '):
self._new_depth(7)
self.interface_association_list.append(self._add_attributes(line)) self.interface_association_list.append(self._add_attributes(line))
if 'interface_association' not in self.output_line['device_descriptor']['configuration_descriptor']:
self.output_line['device_descriptor']['configuration_descriptor']['interface_association'] = {}
self.output_line['device_descriptor']['configuration_descriptor']['interface_association']['attributes'] = []
return True return True
if self.section == 'interface_descriptor' and line.startswith(' '): if self.section == 'interface_descriptor' and line.startswith(' '):
self._new_depth(7)
self.interface_descriptor_list.append(self._add_attributes(line)) self.interface_descriptor_list.append(self._add_attributes(line))
return True return True
if self.section == 'cdc_header' and line.startswith(' '): if self.section == 'cdc_header' and line.startswith(' '):
self._new_depth(9)
self.cdc_header_list.append(self._add_attributes(line)) self.cdc_header_list.append(self._add_attributes(line))
return True return True
if self.section == 'cdc_call_management' and line.startswith(' '): if self.section == 'cdc_call_management' and line.startswith(' '):
self._new_depth(9)
self.cdc_call_management_list.append(self._add_attributes(line)) self.cdc_call_management_list.append(self._add_attributes(line))
return True return True
if self.section == 'cdc_acm' and line.startswith(' '): if self.section == 'cdc_acm' and line.startswith(' '):
self._new_depth(9)
self.cdc_acm_list.append(self._add_attributes(line)) self.cdc_acm_list.append(self._add_attributes(line))
return True return True
if self.section == 'cdc_union' and line.startswith(' '): if self.section == 'cdc_union' and line.startswith(' '):
self._new_depth(9)
self.cdc_union_list.append(self._add_attributes(line)) self.cdc_union_list.append(self._add_attributes(line))
return True return True
if self.section == 'hid_device_descriptor' and line.startswith(' '): if self.section == 'hid_device_descriptor' and line.startswith(' '):
self._new_depth(11)
self.report_descriptors_list.append(self._add_attributes(line)) self.report_descriptors_list.append(self._add_attributes(line))
return True return True
if self.section == 'report_descriptors' and line.startswith(' '): if self.section == 'report_descriptors' and line.startswith(' '):
self._new_depth(13)
self.report_descriptors_list.append(self._add_attributes(line)) self.report_descriptors_list.append(self._add_attributes(line))
return True return True
if self.section == 'endpoint_descriptor' and line.startswith(' '): if self.section == 'endpoint_descriptor' and line.startswith(' '):
self._new_depth(9)
self.endpoint_descriptor_list.append(self._add_attributes(line)) self.endpoint_descriptor_list.append(self._add_attributes(line))
return True return True
if self.section == 'hub_descriptor' and line.startswith(' '): if self.section == 'hub_descriptor' and line.startswith(' '):
self._new_depth(3)
self.hub_descriptor_list.append(self._add_attributes(line)) self.hub_descriptor_list.append(self._add_attributes(line))
return True return True
if self.section == 'hub_port_status' and line.startswith(' '): if self.section == 'hub_port_status' and line.startswith(' '):
self._new_depth(5)
self.hub_port_status_list.append(self._add_attributes(line)) self.hub_port_status_list.append(self._add_attributes(line))
return True return True
if self.section == 'device_status' and line.startswith(' '):
self.device_status_list.append(self._add_attributes(line))
return True
def parse(data, raw=False, quiet=False): def parse(data, raw=False, quiet=False):
""" """
@ -434,15 +335,23 @@ def parse(data, raw=False, quiet=False):
s = _LsUsb() s = _LsUsb()
for line in data.splitlines(): for line in data.splitlines():
print(f'section: {s.section}, depth: {s.section_depth}') print(f'''
# populate schema when backing out of nodes {s.section=}
s._populate_schema() {s.device_descriptor_list=}
{s.configuration_descriptor_list=}
# ignore blank lines {s.interface_association_list=}
if not line: {s.interface_descriptor_list=}
s._new_section('') {s.cdc_header_list=}
s._new_depth(0) {s.cdc_call_management_list=}
continue {s.cdc_acm_list=}
{s.cdc_union_list=}
{s.endpoint_descriptor_list=}
{s.hid_device_descriptor_list=}
{s.report_descriptors_list=}
{s.hub_descriptor_list=}
{s.hub_port_status_list=}
{s.device_status_list=}
''')
# sections # sections
if s._set_sections(line): if s._set_sections(line):
@ -451,10 +360,6 @@ def parse(data, raw=False, quiet=False):
# create section lists and schema # create section lists and schema
if s._populate_lists(line): if s._populate_lists(line):
continue continue
# get final list entries
s._new_depth(-1)
s._populate_schema(final=True)
# output the raw object # output the raw object
if s.output_line: if s.output_line: