1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-23 00:29:59 +02:00

fix lsusb for extra hub port info and add videocontrol and videostreaming sections

This commit is contained in:
Kelly Brazil
2023-01-04 18:27:51 -08:00
parent aeff94d272
commit 3a2ff61899
4 changed files with 4262 additions and 41 deletions

View File

@ -269,7 +269,7 @@ from jc.exceptions import ParseError
class info():
"""Provides parser metadata (version, author, etc.)"""
version = '1.2'
version = '1.3'
description = '`lsusb` command parser'
author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com'
@ -307,6 +307,54 @@ class _NestedDict(dict):
return self.setdefault(key, _NestedDict())
class _descriptor_obj:
def __init__(self, name):
self.name = name
self.list = []
def _entries_for_this_bus_and_interface_idx_exist(self, bus_idx, iface_idx):
"""Returns true if there are object entries for the corresponding bus index and interface index"""
for item in self.list:
keyname = tuple(item.keys())[0]
if '_state' in item[keyname] and item[keyname]['_state']['bus_idx'] == bus_idx and item[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
return True
return False
def _get_objects_list(self, bus_idx, iface_idx):
"""Returns a list of descriptor object dictionaries for the corresponding bus index and interface index"""
object_collection = []
# find max number of items in this object that match the bus_idx and iface_idx
num_of_items = -1
for item in self.list:
keyname = tuple(item.keys())[0]
if '_state' in item[keyname] and item[keyname]['_state']['bus_idx'] == bus_idx and item[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
num_of_items = item[keyname]['_state'][f'{self.name}_idx']
# create and return the collection of objects that match the bus_idx and iface_idx
if num_of_items > -1:
for obj_idx in range(num_of_items + 1):
this_object = {}
for item in self.list:
keyname = tuple(item.keys())[0]
if '_state' in item[keyname] and item[keyname]['_state']['bus_idx'] == bus_idx and item[keyname]['_state']['interface_descriptor_idx'] == iface_idx and item[keyname]['_state'][f'{self.name}_idx'] == obj_idx:
# is this a top level value or an attribute?
if item[keyname]['_state']['attribute_value']:
last_item = item[keyname]['_state']['last_item']
if 'attributes' not in this_object[last_item]:
this_object[last_item]['attributes'] = []
this_attribute = f'{keyname} {item[keyname].get("value", "")} {item[keyname].get("description", "")}'.strip()
this_object[last_item]['attributes'].append(this_attribute)
continue
this_object.update(item)
del item[keyname]['_state']
object_collection.append(this_object)
return object_collection
class _LsUsb():
def __init__(self):
self.raw_output = []
@ -314,9 +362,18 @@ class _LsUsb():
self.section = ''
self.old_section = ''
# section_header is formatted with the correct spacing to be used with
# jc.parsers.universal.sparse_table_parse(). Pad end of string to be at least len of 25
# this value changes for different sections (e.g. videocontrol & videostreaming)
self.normal_section_header = 'key val description'
self.larger_section_header = 'key val description'
self.bus_idx = -1
self.interface_descriptor_idx = -1
self.endpoint_descriptor_idx = -1
self.videocontrol_interface_descriptor_idx = -1
self.videostreaming_interface_descriptor_idx = -1
self.last_item = ''
self.last_indent = 0
self.attribute_value = False
@ -331,7 +388,9 @@ class _LsUsb():
self.cdc_call_management_list = []
self.cdc_acm_list = []
self.cdc_union_list = []
self.endpoint_descriptor_list = []
self.endpoint_descriptors = _descriptor_obj('endpoint_descriptor')
self.videocontrol_interface_descriptors = _descriptor_obj('videocontrol_interface_descriptor')
self.videostreaming_interface_descriptors = _descriptor_obj('videostreaming_interface_descriptor')
self.hid_device_descriptor_list = []
self.report_descriptors_list = []
self.hub_descriptor_list = []
@ -360,9 +419,9 @@ class _LsUsb():
else:
self.attribute_value = False
# section_header is formatted with the correct spacing to be used with
# jc.parsers.universal.sparse_table_parse(). Pad end of string to be at least len of 25
section_header = 'key val description'
section_header = self.normal_section_header
if self.section == 'videocontrol_interface_descriptor' or self.section == 'videostreaming_interface_descriptor':
section_header = self.larger_section_header
temp_obj = [section_header, line.strip() + (' ' * 25)]
temp_obj = sparse_table_parse(temp_obj)
@ -377,7 +436,9 @@ class _LsUsb():
'last_item': self.last_item,
'bus_idx': self.bus_idx,
'interface_descriptor_idx': self.interface_descriptor_idx,
'endpoint_descriptor_idx': self.endpoint_descriptor_idx
'endpoint_descriptor_idx': self.endpoint_descriptor_idx,
'videocontrol_interface_descriptor_idx': self.videocontrol_interface_descriptor_idx,
'videostreaming_interface_descriptor_idx': self.videostreaming_interface_descriptor_idx
}
}
}
@ -435,6 +496,8 @@ class _LsUsb():
self.bus_idx += 1
self.interface_descriptor_idx = -1
self.endpoint_descriptor_idx = -1
self.videocontrol_interface_descriptor_idx = -1
self.videostreaming_interface_descriptor_idx = -1
self.attribute_value = False
line_split = line.strip().split(maxsplit=6)
self.bus_list.append(
@ -465,6 +528,20 @@ class _LsUsb():
self.attribute_value = False
return True
# This section is a list, so need to update the index
if line.startswith(' VideoControl Interface Descriptor:'):
self.section = 'videocontrol_interface_descriptor'
self.videocontrol_interface_descriptor_idx += 1
self.attribute_value = False
return True
# This section is a list, so need to update the index
if line.startswith(' VideoStreaming Interface Descriptor:'):
self.section = 'videostreaming_interface_descriptor'
self.videostreaming_interface_descriptor_idx += 1
self.attribute_value = False
return True
# some device status information is displayed on the initial line so need to extract immediately
if line.startswith('Device Status:'):
self.section = 'device_status'
@ -517,7 +594,9 @@ class _LsUsb():
'cdc_union': self.cdc_union_list,
'hid_device_descriptor': self.hid_device_descriptor_list,
'report_descriptors': self.report_descriptors_list,
'endpoint_descriptor': self.endpoint_descriptor_list,
'videocontrol_interface_descriptor': self.videocontrol_interface_descriptors.list,
'videostreaming_interface_descriptor': self.videostreaming_interface_descriptors.list,
'endpoint_descriptor': self.endpoint_descriptors.list,
'hub_descriptor': self.hub_descriptor_list,
'device_qualifier': self.device_qualifier_list
}
@ -528,7 +607,7 @@ class _LsUsb():
return True
# special handling of these sections
if line.startswith(' ') and self.section == 'hub_port_status':
if line.startswith(' ') and not line.startswith(' ') and self.section == 'hub_port_status':
self.hub_port_status_list.append(self._add_hub_port_status_attributes(line))
return True
@ -547,6 +626,10 @@ class _LsUsb():
['device_descriptor']['configuration_descriptor']['interface_association'] = {}
['device_descriptor']['configuration_descriptor']['interface_descriptors'] = []
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0] = {}
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videocontrol_interface_descriptors'] = []
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videocontrol_interface_descriptors'][0] = {}
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videostreaming_interface_descriptors'] = []
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['videostreaming_interface_descriptors'][0] = {}
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_header'] = {}
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_call_management'] = {}
['device_descriptor']['configuration_descriptor']['interface_descriptors'][0]['cdc_acm'] = {}
@ -746,41 +829,26 @@ class _LsUsb():
# i_desc_obj['hid_device_descriptor']['report_descriptors'].update(rd)
# del i_desc_obj['hid_device_descriptor']['report_descriptors'][keyname]['_state']
# add endpoint_descriptor key if it doesn't exist and there are entries for this interface_descriptor
for endpoint_attrs in self.endpoint_descriptor_list:
keyname = tuple(endpoint_attrs.keys())[0]
if '_state' in endpoint_attrs[keyname] and endpoint_attrs[keyname]['_state']['bus_idx'] == idx and endpoint_attrs[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
i_desc_obj['endpoint_descriptors'] = []
# add videocontrol_interface_descriptors key if it doesn't exist and there are entries for this interface_descriptor
if self.videocontrol_interface_descriptors._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx):
i_desc_obj['videocontrol_interface_descriptors'] = []
i_desc_obj['videocontrol_interface_descriptors'].extend(
self.videocontrol_interface_descriptors._get_objects_list(idx, iface_idx)
)
# find max index for this endpoint_descriptor idx, then iterate over that range
e_desc_iters = -1
for endpoint_attrs in self.endpoint_descriptor_list:
keyname = tuple(endpoint_attrs.keys())[0]
if '_state' in endpoint_attrs[keyname] and endpoint_attrs[keyname]['_state']['bus_idx'] == idx and endpoint_attrs[keyname]['_state']['interface_descriptor_idx'] == iface_idx:
e_desc_iters = endpoint_attrs[keyname]['_state']['endpoint_descriptor_idx']
# add videostreaming_interface_descriptors key if it doesn't exist and there are entries for this interface_descriptor
if self.videostreaming_interface_descriptors._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx):
i_desc_obj['videostreaming_interface_descriptors'] = []
i_desc_obj['videostreaming_interface_descriptors'].extend(
self.videostreaming_interface_descriptors._get_objects_list(idx, iface_idx)
)
# create the endpoint descriptor object
if e_desc_iters > -1:
for endpoint_idx in range(e_desc_iters + 1):
e_desc_obj = {}
for endpoint_attrs in self.endpoint_descriptor_list:
keyname = tuple(endpoint_attrs.keys())[0]
if '_state' in endpoint_attrs[keyname] and endpoint_attrs[keyname]['_state']['bus_idx'] == idx and endpoint_attrs[keyname]['_state']['interface_descriptor_idx'] == iface_idx and endpoint_attrs[keyname]['_state']['endpoint_descriptor_idx'] == endpoint_idx:
# is this a top level value or an attribute?
if endpoint_attrs[keyname]['_state']['attribute_value']:
last_item = endpoint_attrs[keyname]['_state']['last_item']
if 'attributes' not in e_desc_obj[last_item]:
e_desc_obj[last_item]['attributes'] = []
this_attribute = f'{keyname} {endpoint_attrs[keyname].get("value", "")} {endpoint_attrs[keyname].get("description", "")}'.strip()
e_desc_obj[last_item]['attributes'].append(this_attribute)
continue
e_desc_obj.update(endpoint_attrs)
del endpoint_attrs[keyname]['_state']
i_desc_obj['endpoint_descriptors'].append(e_desc_obj)
# add endpoint_descriptors key if it doesn't exist and there are entries for this interface_descriptor
if self.endpoint_descriptors._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx):
i_desc_obj['endpoint_descriptors'] = []
i_desc_obj['endpoint_descriptors'].extend(
self.endpoint_descriptors._get_objects_list(idx, iface_idx)
)
# add the object to the list of interface descriptors
self.output_line['device_descriptor']['configuration_descriptor']['interface_descriptors'].append(i_desc_obj)

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -34,6 +34,9 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/lsusb-binary-object-store.out'), 'r', encoding='utf-8') as f:
generic_lsusb_binary_object_store = f.read()
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/lsusb-extra-hub-port-status-info.out'), 'r', encoding='utf-8') as f:
generic_lsusb_extra_hub_port_status_info = f.read()
# output
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/centos-7.7/lsusb.json'), 'r', encoding='utf-8') as f:
centos_7_7_lsusb_json = json.loads(f.read())
@ -56,6 +59,9 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/lsusb-binary-object-store.json'), 'r', encoding='utf-8') as f:
generic_lsusb_binary_object_store_json = json.loads(f.read())
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/lsusb-extra-hub-port-status-info.json'), 'r', encoding='utf-8') as f:
generic_lsusb_extra_hub_port_status_info_json = json.loads(f.read())
def test_lsusb_nodata(self):
"""
@ -111,6 +117,12 @@ class MyTests(unittest.TestCase):
"""
self.assertEqual(jc.parsers.lsusb.parse(self.generic_lsusb_binary_object_store, quiet=True), self.generic_lsusb_binary_object_store_json)
def test_lsusb_extra_hub_port_status_info(self):
"""
Test 'lsusb -v' with extra information in the hub port status section
"""
self.assertEqual(jc.parsers.lsusb.parse(self.generic_lsusb_extra_hub_port_status_info, quiet=True), self.generic_lsusb_extra_hub_port_status_info_json)
if __name__ == '__main__':
unittest.main()