diff --git a/CHANGELOG b/CHANGELOG index b907812b..26b97fc0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -3,6 +3,8 @@ jc changelog 20230103 v1.22.5 - Add TOML parser - Update copyright dates +- Fix `lsusb` parser when extra hub port status information is output +- Refactor `lsusb` parser for more code reuse - Fix INI parser to include top-level values with no section header - Fix INI and Key/Value parsers to only remove one quotation mark from the beginning and end of values. diff --git a/jc/parsers/lsusb.py b/jc/parsers/lsusb.py index 65e6a675..a35a8339 100644 --- a/jc/parsers/lsusb.py +++ b/jc/parsers/lsusb.py @@ -321,7 +321,7 @@ class _root_obj: return False def _update_output(self, bus_idx, output_line): - """modifies output_line dictionary for the corresponding bus index and interface index. + """modifies output_line dictionary for the corresponding bus index. output_line is the self.output_line attribute from the _lsusb object.""" for item in self.list: keyname = tuple(item.keys())[0] @@ -354,6 +354,40 @@ class _descriptor_obj: return True return False + def _update_output(self, bus_idx, iface_idx, output_line): + """modifies output_line dictionary for the corresponding bus index and interface index. + output_line is the i_desc_obj object.""" + for item in self.list: + keyname = tuple(item.keys())[0] + if '_state' in item[keyname] and item[keyname]['_state']['bus_idx'] == bus_idx and item[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + + # is this a top level value or an attribute? + if item[keyname]['_state']['attribute_value']: + last_item = item[keyname]['_state']['last_item'] + if 'attributes' not in output_line[f'{self.name}'][last_item]: + output_line[f'{self.name}'][last_item]['attributes'] = [] + + this_attribute = f'{keyname} {item[keyname].get("value", "")} {item[keyname].get("description", "")}'.strip() + output_line[f'{self.name}'][last_item]['attributes'].append(this_attribute) + continue + + output_line[f'{self.name}'].update(item) + del output_line[f'{self.name}'][keyname]['_state'] + + +class _descriptor_list: + def __init__(self, name): + self.name = name + self.list = [] + + def _entries_for_this_bus_and_interface_idx_exist(self, bus_idx, iface_idx): + """Returns true if there are object entries for the corresponding bus index and interface index""" + for item in self.list: + keyname = tuple(item.keys())[0] + if '_state' in item[keyname] and item[keyname]['_state']['bus_idx'] == bus_idx and item[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + return True + return False + def _get_objects_list(self, bus_idx, iface_idx): """Returns a list of descriptor object dictionaries for the corresponding bus index and interface index""" object_collection = [] @@ -418,14 +452,14 @@ class _LsUsb(): self.interface_association = _root_obj('interface_association') self.interface_descriptor_list = [] self.interface_descriptor_attribute_list = [] - self.cdc_header_list = [] - self.cdc_call_management_list = [] - self.cdc_acm_list = [] - self.cdc_union_list = [] - self.endpoint_descriptors = _descriptor_obj('endpoint_descriptor') - self.videocontrol_interface_descriptors = _descriptor_obj('videocontrol_interface_descriptor') - self.videostreaming_interface_descriptors = _descriptor_obj('videostreaming_interface_descriptor') - self.hid_device_descriptor_list = [] + self.cdc_header = _descriptor_obj('cdc_header') + self.cdc_call_management = _descriptor_obj('cdc_call_management') + self.cdc_acm = _descriptor_obj('cdc_acm') + self.cdc_union = _descriptor_obj('cdc_union') + self.endpoint_descriptors = _descriptor_list('endpoint_descriptor') + self.videocontrol_interface_descriptors = _descriptor_list('videocontrol_interface_descriptor') + self.videostreaming_interface_descriptors = _descriptor_list('videostreaming_interface_descriptor') + self.hid_device_descriptor = _descriptor_obj('hid_device_descriptor') self.report_descriptors_list = [] self.hub_descriptor_list = [] self.hub_port_status_list = [] @@ -624,11 +658,11 @@ class _LsUsb(): 'configuration_descriptor': self.configuration_descriptor.list, 'interface_association': self.interface_association.list, 'interface_descriptor': self.interface_descriptor_list, - 'cdc_header': self.cdc_header_list, - 'cdc_call_management': self.cdc_call_management_list, - 'cdc_acm': self.cdc_acm_list, - 'cdc_union': self.cdc_union_list, - 'hid_device_descriptor': self.hid_device_descriptor_list, + 'cdc_header': self.cdc_header.list, + 'cdc_call_management': self.cdc_call_management.list, + 'cdc_acm': self.cdc_acm.list, + 'cdc_union': self.cdc_union.list, + 'hid_device_descriptor': self.hid_device_descriptor.list, 'report_descriptors': self.report_descriptors_list, 'videocontrol_interface_descriptor': self.videocontrol_interface_descriptors.list, 'videostreaming_interface_descriptor': self.videostreaming_interface_descriptors.list, @@ -733,98 +767,32 @@ class _LsUsb(): del iface_attrs[keyname]['_state'] i_desc_obj.update(iface_attrs) - # add other nodes to the object (cdc_header, endpoint descriptors, etc.) - for ch in self.cdc_header_list: - keyname = tuple(ch.keys())[0] - if '_state' in ch[keyname] and ch[keyname]['_state']['bus_idx'] == idx and ch[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + # add cdc_header key if it doesn't exist and there are entries for this interface_descriptor + if self.cdc_header._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx): + self.cdc_header._update_output(idx, iface_idx, i_desc_obj) - # is this a top level value or an attribute? - if ch[keyname]['_state']['attribute_value']: - last_item = ch[keyname]['_state']['last_item'] - if 'attributes' not in i_desc_obj['cdc_header'][last_item]: - i_desc_obj['cdc_header'][last_item]['attributes'] = [] + # add cdc_call_management key if it doesn't exist and there are entries for this interface_descriptor + if self.cdc_call_management._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx): + self.cdc_call_management._update_output(idx, iface_idx, i_desc_obj) - this_attribute = f'{keyname} {ch[keyname].get("value", "")} {ch[keyname].get("description", "")}'.strip() - i_desc_obj['cdc_header'][last_item]['attributes'].append(this_attribute) - continue + # add cdc_acm key if it doesn't exist and there are entries for this interface_descriptor + if self.cdc_acm._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx): + self.cdc_acm._update_output(idx, iface_idx, i_desc_obj) - i_desc_obj['cdc_header'].update(ch) - del i_desc_obj['cdc_header'][keyname]['_state'] + # add cdc_union key if it doesn't exist and there are entries for this interface_descriptor + if self.cdc_union._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx): + self.cdc_union._update_output(idx, iface_idx, i_desc_obj) - for ccm in self.cdc_call_management_list: - keyname = tuple(ccm.keys())[0] - if '_state' in ccm[keyname] and ccm[keyname]['_state']['bus_idx'] == idx and ccm[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + # add hid_device_descriptor key if it doesn't exist and there are entries for this interface_descriptor + if self.hid_device_descriptor._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx): + self.hid_device_descriptor._update_output(idx, iface_idx, i_desc_obj) - # is this a top level value or an attribute? - if ccm[keyname]['_state']['attribute_value']: - last_item = ccm[keyname]['_state']['last_item'] - if 'attributes' not in i_desc_obj['cdc_call_management'][last_item]: - i_desc_obj['cdc_call_management'][last_item]['attributes'] = [] - - this_attribute = f'{keyname} {ccm[keyname].get("value", "")} {ccm[keyname].get("description", "")}'.strip() - i_desc_obj['cdc_call_management'][last_item]['attributes'].append(this_attribute) - continue - - i_desc_obj['cdc_call_management'].update(ccm) - del i_desc_obj['cdc_call_management'][keyname]['_state'] - - for ca in self.cdc_acm_list: - keyname = tuple(ca.keys())[0] - if '_state' in ca[keyname] and ca[keyname]['_state']['bus_idx'] == idx and ca[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - - # is this a top level value or an attribute? - if ca[keyname]['_state']['attribute_value']: - last_item = ca[keyname]['_state']['last_item'] - if 'attributes' not in i_desc_obj['cdc_acm'][last_item]: - i_desc_obj['cdc_acm'][last_item]['attributes'] = [] - - this_attribute = f'{keyname} {ca[keyname].get("value", "")} {ca[keyname].get("description", "")}'.strip() - i_desc_obj['cdc_acm'][last_item]['attributes'].append(this_attribute) - continue - - i_desc_obj['cdc_acm'].update(ca) - del i_desc_obj['cdc_acm'][keyname]['_state'] - - for cu in self.cdc_union_list: - keyname = tuple(cu.keys())[0] - if '_state' in cu[keyname] and cu[keyname]['_state']['bus_idx'] == idx and cu[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - - # is this a top level value or an attribute? - if cu[keyname]['_state']['attribute_value']: - last_item = cu[keyname]['_state']['last_item'] - if 'attributes' not in i_desc_obj['cdc_union'][last_item]: - i_desc_obj['cdc_union'][last_item]['attributes'] = [] - - this_attribute = f'{keyname} {cu[keyname].get("value", "")} {cu[keyname].get("description", "")}'.strip() - i_desc_obj['cdc_union'][last_item]['attributes'].append(this_attribute) - continue - - i_desc_obj['cdc_union'].update(cu) - del i_desc_obj['cdc_union'][keyname]['_state'] - - for hidd in self.hid_device_descriptor_list: - keyname = tuple(hidd.keys())[0] - if '_state' in hidd[keyname] and hidd[keyname]['_state']['bus_idx'] == idx and hidd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - - # is this a top level value or an attribute? - if hidd[keyname]['_state']['attribute_value']: - last_item = hidd[keyname]['_state']['last_item'] - if 'attributes' not in i_desc_obj['hid_device_descriptor'][last_item]: - i_desc_obj['hid_device_descriptor'][last_item]['attributes'] = [] - - this_attribute = f'{keyname} {hidd[keyname].get("value", "")} {hidd[keyname].get("description", "")}'.strip() - i_desc_obj['hid_device_descriptor'][last_item]['attributes'].append(this_attribute) - continue - - i_desc_obj['hid_device_descriptor'].update(hidd) - del i_desc_obj['hid_device_descriptor'][keyname]['_state'] - - # Not Implemented: Report Descriptors (need more samples) - # for rd in self.report_descriptors_list: - # keyname = tuple(rd.keys())[0] - # if '_state' in rd[keyname] and rd[keyname]['_state']['bus_idx'] == idx and rd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: - # i_desc_obj['hid_device_descriptor']['report_descriptors'].update(rd) - # del i_desc_obj['hid_device_descriptor']['report_descriptors'][keyname]['_state'] + # Not Implemented: Report Descriptors (need more samples) + # for rd in self.report_descriptors_list: + # keyname = tuple(rd.keys())[0] + # if '_state' in rd[keyname] and rd[keyname]['_state']['bus_idx'] == idx and rd[keyname]['_state']['interface_descriptor_idx'] == iface_idx: + # i_desc_obj['hid_device_descriptor']['report_descriptors'].update(rd) + # del i_desc_obj['hid_device_descriptor']['report_descriptors'][keyname]['_state'] # add videocontrol_interface_descriptors key if it doesn't exist and there are entries for this interface_descriptor if self.videocontrol_interface_descriptors._entries_for_this_bus_and_interface_idx_exist(idx, iface_idx):