1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-19 00:17:51 +02:00

add nested_dict. start work on hub_port_status and device_status

This commit is contained in:
Kelly Brazil
2021-10-15 11:43:15 -07:00
parent 6078a411ef
commit 2dbe56456b

View File

@ -92,12 +92,10 @@ Schema:
} }
}, },
"device_status": { "device_status": {
"attribute": {
"value": string, "value": string,
"description": string "description": string
} }
} }
}
] ]
Examples: Examples:
@ -149,10 +147,19 @@ def _process(proc_data):
return proc_data return proc_data
class _NestedDict(dict):
# for ease of creating/updating nested dictionary structures
# https://stackoverflow.com/questions/5369723/multi-level-defaultdict-with-variable-depth
# https://ohuiginn.net/mt/2010/07/nested_dictionaries_in_python.html
def __getitem__(self, key):
if key in self: return self.get(key)
return self.setdefault(key, _NestedDict())
class _LsUsb(): class _LsUsb():
def __init__(self): def __init__(self):
self.raw_output = [] self.raw_output = []
self.output_line = {} self.output_line = _NestedDict()
self.section = '' self.section = ''
self.bus_idx = -1 self.bus_idx = -1
@ -218,6 +225,25 @@ class _LsUsb():
return line_obj return line_obj
def _add_hub_port_status_attributes(self, line):
return self._add_attributes(line)
# indent = self._count_indent(line)
# line_obj = {
# temp_obj['key']: {
# 'value': temp_obj['val'],
# 'description': temp_obj['description'],
# '_state': {
# 'indent': indent,
# 'bus_idx': self.bus_idx,
# 'interface_descriptor_idx': self.interface_descriptor_idx,
# 'endpoint_descriptor_idx': self.endpoint_descriptor_idx
# }
# }
# }
def _add_device_status_attributes(self, line):
return self._add_attributes(line)
def _set_sections(self, line): def _set_sections(self, line):
# ignore blank lines # ignore blank lines
if not line: if not line:
@ -308,7 +334,6 @@ class _LsUsb():
'report_descriptors': self.report_descriptors_list, 'report_descriptors': self.report_descriptors_list,
'endpoint_descriptor': self.endpoint_descriptor_list, 'endpoint_descriptor': self.endpoint_descriptor_list,
'hub_descriptor': self.hub_descriptor_list, 'hub_descriptor': self.hub_descriptor_list,
'hub_port_status': self.hub_port_status_list,
'device_status': self.device_status_list 'device_status': self.device_status_list
} }
@ -317,6 +342,15 @@ class _LsUsb():
section_list_map[self.section].append(self._add_attributes(line)) section_list_map[self.section].append(self._add_attributes(line))
return True return True
# special handling of these sections
if line.startswith(' ') and self.section == 'hub_port_status':
self.hub_port_status_list.append(self._add_hub_port_status_attributes(line))
return True
if line.startswith(' ') and self.section == 'device_status':
self.device_status_list.append(self._add_device_status_attributes(line))
return True
def _populate_schema(self): def _populate_schema(self):
""" """
Schema: Schema:
@ -342,7 +376,7 @@ class _LsUsb():
if self.output_line: if self.output_line:
self.raw_output.append(self.output_line) self.raw_output.append(self.output_line)
self.output_line = {} self.output_line = _NestedDict()
del item['_state'] del item['_state']
self.output_line.update(item) self.output_line.update(item)
@ -350,24 +384,18 @@ class _LsUsb():
for dd in self.device_descriptor_list: for dd in self.device_descriptor_list:
keyname = tuple(dd.keys())[0] keyname = tuple(dd.keys())[0]
if '_state' in dd[keyname] and dd[keyname]['_state']['bus_idx'] == idx: if '_state' in dd[keyname] and dd[keyname]['_state']['bus_idx'] == idx:
if 'device_descriptor' not in self.output_line:
self.output_line['device_descriptor'] = {}
self.output_line['device_descriptor'].update(dd) self.output_line['device_descriptor'].update(dd)
del self.output_line['device_descriptor'][keyname]['_state'] del self.output_line['device_descriptor'][keyname]['_state']
for cd in self.configuration_descriptor_list: for cd in self.configuration_descriptor_list:
keyname = tuple(cd.keys())[0] keyname = tuple(cd.keys())[0]
if '_state' in cd[keyname] and cd[keyname]['_state']['bus_idx'] == idx: if '_state' in cd[keyname] and cd[keyname]['_state']['bus_idx'] == idx:
if 'configuration_descriptor' not in self.output_line['device_descriptor']:
self.output_line['device_descriptor']['configuration_descriptor'] = {}
self.output_line['device_descriptor']['configuration_descriptor'].update(cd) self.output_line['device_descriptor']['configuration_descriptor'].update(cd)
del self.output_line['device_descriptor']['configuration_descriptor'][keyname]['_state'] del self.output_line['device_descriptor']['configuration_descriptor'][keyname]['_state']
for ia in self.interface_association_list: for ia in self.interface_association_list:
keyname = tuple(ia.keys())[0] keyname = tuple(ia.keys())[0]
if '_state' in ia[keyname] and ia[keyname]['_state']['bus_idx'] == idx: if '_state' in ia[keyname] and ia[keyname]['_state']['bus_idx'] == idx:
if 'interface_association' not in self.output_line['device_descriptor']['configuration_descriptor']:
self.output_line['device_descriptor']['configuration_descriptor']['interface_association'] = {}
self.output_line['device_descriptor']['configuration_descriptor']['interface_association'].update(ia) self.output_line['device_descriptor']['configuration_descriptor']['interface_association'].update(ia)
del self.output_line['device_descriptor']['configuration_descriptor']['interface_association'][keyname]['_state'] del self.output_line['device_descriptor']['configuration_descriptor']['interface_association'][keyname]['_state']
@ -388,7 +416,7 @@ class _LsUsb():
# create the interface descriptor object # create the interface descriptor object
if i_desc_iters > -1: if i_desc_iters > -1:
for iface_idx in range(i_desc_iters + 1): for iface_idx in range(i_desc_iters + 1):
i_desc_obj = {} i_desc_obj = _NestedDict()
for iface_attrs in self.interface_descriptor_list: for iface_attrs in self.interface_descriptor_list:
keyname = tuple(iface_attrs.keys())[0] keyname = tuple(iface_attrs.keys())[0]
if '_state' in iface_attrs[keyname] and iface_attrs[keyname]['_state']['bus_idx'] == idx and iface_attrs[keyname]['_state']['interface_descriptor_idx'] == iface_idx: if '_state' in iface_attrs[keyname] and iface_attrs[keyname]['_state']['bus_idx'] == idx and iface_attrs[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
@ -399,48 +427,36 @@ class _LsUsb():
for ch in self.cdc_header_list: for ch in self.cdc_header_list:
keyname = tuple(ch.keys())[0] keyname = tuple(ch.keys())[0]
if '_state' in ch[keyname] and ch[keyname]['_state']['bus_idx'] == idx and ch[keyname]['_state']['interface_descriptor_idx'] == iface_idx: if '_state' in ch[keyname] and ch[keyname]['_state']['bus_idx'] == idx and ch[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
if 'cdc_header' not in i_desc_obj:
i_desc_obj['cdc_header'] = {}
i_desc_obj['cdc_header'].update(ch) i_desc_obj['cdc_header'].update(ch)
del i_desc_obj['cdc_header'][keyname]['_state'] del i_desc_obj['cdc_header'][keyname]['_state']
for ccm in self.cdc_call_management_list: for ccm in self.cdc_call_management_list:
keyname = tuple(ccm.keys())[0] keyname = tuple(ccm.keys())[0]
if '_state' in ccm[keyname] and ccm[keyname]['_state']['bus_idx'] == idx and ccm[keyname]['_state']['interface_descriptor_idx'] == iface_idx: if '_state' in ccm[keyname] and ccm[keyname]['_state']['bus_idx'] == idx and ccm[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
if 'cdc_call_management' not in i_desc_obj:
i_desc_obj['cdc_call_management'] = {}
i_desc_obj['cdc_call_management'].update(ccm) i_desc_obj['cdc_call_management'].update(ccm)
del i_desc_obj['cdc_call_management'][keyname]['_state'] del i_desc_obj['cdc_call_management'][keyname]['_state']
for ca in self.cdc_acm_list: for ca in self.cdc_acm_list:
keyname = tuple(ca.keys())[0] keyname = tuple(ca.keys())[0]
if '_state' in ca[keyname] and ca[keyname]['_state']['bus_idx'] == idx and ca[keyname]['_state']['interface_descriptor_idx'] == iface_idx: if '_state' in ca[keyname] and ca[keyname]['_state']['bus_idx'] == idx and ca[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
if 'cdc_acm' not in i_desc_obj:
i_desc_obj['cdc_acm'] = {}
i_desc_obj['cdc_acm'].update(ca) i_desc_obj['cdc_acm'].update(ca)
del i_desc_obj['cdc_acm'][keyname]['_state'] del i_desc_obj['cdc_acm'][keyname]['_state']
for cu in self.cdc_union_list: for cu in self.cdc_union_list:
keyname = tuple(cu.keys())[0] keyname = tuple(cu.keys())[0]
if '_state' in cu[keyname] and cu[keyname]['_state']['bus_idx'] == idx and cu[keyname]['_state']['interface_descriptor_idx'] == iface_idx: if '_state' in cu[keyname] and cu[keyname]['_state']['bus_idx'] == idx and cu[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
if 'cdc_union' not in i_desc_obj:
i_desc_obj['cdc_union'] = {}
i_desc_obj['cdc_union'].update(cu) i_desc_obj['cdc_union'].update(cu)
del i_desc_obj['cdc_union'][keyname]['_state'] del i_desc_obj['cdc_union'][keyname]['_state']
for hd in self.hid_device_descriptor_list: for hd in self.hid_device_descriptor_list:
keyname = tuple(hd.keys())[0] keyname = tuple(hd.keys())[0]
if '_state' in hd[keyname] and hd[keyname]['_state']['bus_idx'] == idx and hd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: if '_state' in hd[keyname] and hd[keyname]['_state']['bus_idx'] == idx and hd[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
if 'hid_device_descriptor' not in i_desc_obj:
i_desc_obj['hid_device_descriptor'] = {}
i_desc_obj['hid_device_descriptor'].update(hd) i_desc_obj['hid_device_descriptor'].update(hd)
del i_desc_obj['hid_device_descriptor'][keyname]['_state'] del i_desc_obj['hid_device_descriptor'][keyname]['_state']
for rd in self.report_descriptors_list: for rd in self.report_descriptors_list:
keyname = tuple(rd.keys())[0] keyname = tuple(rd.keys())[0]
if '_state' in rd[keyname] and rd[keyname]['_state']['bus_idx'] == idx and rd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: if '_state' in rd[keyname] and rd[keyname]['_state']['bus_idx'] == idx and rd[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
if 'report_descriptors' not in i_desc_obj['hid_device_descriptor']:
i_desc_obj['hid_device_descriptor']['report_descriptors'] = {}
i_desc_obj['hid_device_descriptor']['report_descriptors'].update(rd) i_desc_obj['hid_device_descriptor']['report_descriptors'].update(rd)
del i_desc_obj['hid_device_descriptor']['report_descriptors'][keyname]['_state'] del i_desc_obj['hid_device_descriptor']['report_descriptors'][keyname]['_state']
@ -476,24 +492,18 @@ class _LsUsb():
for hd in self.hub_descriptor_list: for hd in self.hub_descriptor_list:
keyname = tuple(hd.keys())[0] keyname = tuple(hd.keys())[0]
if '_state' in hd[keyname] and hd[keyname]['_state']['bus_idx'] == idx: if '_state' in hd[keyname] and hd[keyname]['_state']['bus_idx'] == idx:
if 'hub_descriptor' not in self.output_line:
self.output_line['hub_descriptor'] = {}
self.output_line['hub_descriptor'].update(hd) self.output_line['hub_descriptor'].update(hd)
del self.output_line['hub_descriptor'][keyname]['_state'] del self.output_line['hub_descriptor'][keyname]['_state']
for hps in self.hub_port_status_list: for hps in self.hub_port_status_list:
keyname = tuple(hps.keys())[0] keyname = tuple(hps.keys())[0]
if '_state' in hps[keyname] and hps[keyname]['_state']['bus_idx'] == idx: if '_state' in hps[keyname] and hps[keyname]['_state']['bus_idx'] == idx:
if 'hub_port_status' not in self.output_line['hub_descriptor']:
self.output_line['hub_descriptor']['hub_port_status'] = {}
self.output_line['hub_descriptor']['hub_port_status'].update(hps) self.output_line['hub_descriptor']['hub_port_status'].update(hps)
del self.output_line['hub_descriptor']['hub_port_status'][keyname]['_state'] del self.output_line['hub_descriptor']['hub_port_status'][keyname]['_state']
for ds in self.device_status_list: for ds in self.device_status_list:
keyname = tuple(ds.keys())[0] keyname = tuple(ds.keys())[0]
if '_state' in ds[keyname] and ds[keyname]['_state']['bus_idx'] == idx: if '_state' in ds[keyname] and ds[keyname]['_state']['bus_idx'] == idx:
if 'device_status' not in self.output_line:
self.output_line['device_status'] = {}
self.output_line['device_status'].update(ds) self.output_line['device_status'].update(ds)
del self.output_line['device_status'][keyname]['_state'] del self.output_line['device_status'][keyname]['_state']
@ -527,6 +537,13 @@ def parse(data, raw=False, quiet=False):
if s._populate_lists(line): if s._populate_lists(line):
continue continue
# populate the schema
s._populate_schema()
# output the raw object
if s.output_line:
s.raw_output.append(s.output_line)
# print(f''' # print(f'''
# {s.section=} # {s.section=}
# {s.bus_list=} # {s.bus_list=}
@ -546,11 +563,4 @@ def parse(data, raw=False, quiet=False):
# {s.device_status_list=} # {s.device_status_list=}
# ''') # ''')
# populate the schema
s._populate_schema()
# output the raw object
if s.output_line:
s.raw_output.append(s.output_line)
return s.raw_output if raw else _process(s.raw_output) return s.raw_output if raw else _process(s.raw_output)