From 9776dd1082ec8ff1d49ffb83c41c277102522fed Mon Sep 17 00:00:00 2001 From: Thomas Vincent Date: Mon, 19 Dec 2022 22:13:58 -0500 Subject: [PATCH] add new iwconfig parser --- jc/parsers/iwconfig.py | 170 ++++++++++++++++++++++ tests/fixtures/ubuntu-20.10/iwconfig.json | 29 ++++ tests/fixtures/ubuntu-20.10/iwconfig.out | 9 ++ tests/test_iwconfig.py | 33 +++++ 4 files changed, 241 insertions(+) create mode 100644 jc/parsers/iwconfig.py create mode 100644 tests/fixtures/ubuntu-20.10/iwconfig.json create mode 100644 tests/fixtures/ubuntu-20.10/iwconfig.out create mode 100644 tests/test_iwconfig.py diff --git a/jc/parsers/iwconfig.py b/jc/parsers/iwconfig.py new file mode 100644 index 00000000..c474c03a --- /dev/null +++ b/jc/parsers/iwconfig.py @@ -0,0 +1,170 @@ +"""jc - JSON Convert `iwconfig` command output parser + +No `iwconfig` options are supported. + +Usage (cli): + + $ iwconfig | jc --iwconfig + +or + + $ jc iwconfig + +Usage (module): + + import jc + result = jc.parse('iwconfig', iwconfig_command_output) + +Schema: + + [ + ] + + +Examples: + + $ iwconfig | jc --ifconfig -p + +""" +import re +from typing import List, Dict +from jc.jc_types import JSONDictType +import jc.utils + +class info(): + """Provides parser metadata (version, author, etc.)""" + version = '1.0' + description = '`iwconfig` command parser' + author = 'Thomas Vincent' + author_email = 'vrince@gmail.com' + compatible = ['linux'] + magic_commands = ['iwconfig'] + + +__version__ = info.version + + +def _process(proc_data: List[JSONDictType]) -> List[JSONDictType]: + """ + Final processing to conform to the schema. + + Parameters: + + proc_data: (List of Dictionaries) raw structured data to process + + Returns: + + List of Dictionaries. Structured to conform to the schema. + """ + int_list = { + 'flags', 'mtu', 'ipv6_mask', 'rx_packets', 'rx_bytes', 'rx_errors', 'rx_dropped', + 'rx_overruns', 'rx_frame', 'tx_packets', 'tx_bytes', 'tx_errors', 'tx_dropped', + 'tx_overruns', 'tx_carrier', 'tx_collisions', 'metric', 'nd6_options', 'lane' + } + float_list = {'rx_power_mw', 'rx_power_dbm', 'tx_bias_ma'} + return proc_data + +def parse( + data: str, + raw: bool = False, + quiet: bool = False +) -> List[JSONDictType]: + """ + Main text parsing function + + Parameters: + + data: (string) text data to parse + raw: (boolean) unprocessed output if True + quiet: (boolean) suppress warning messages if True + + Returns: + + List of Dictionaries. Raw or processed structured data. + """ + jc.utils.compatibility(__name__, info.compatible, quiet) + jc.utils.input_type_check(data) + + raw_output: List[Dict] = [] + + # for backwards compatibility, preset all fields to None + wireless_extension_obj: Dict = { + "name": None, + "protocol": None, + "essid": None, + "mode": None, + "frequency": None, + "frequency_unit": None, + "access_point": None, + "bit_rate": None, + "tx_power": None, + "tx_power_unit": None, + "retry_short_limit": None, + "rts_threshold": None, + "fragment_threshold": None, + "poser_management": None, + "link_quality": None, + "signal_level": None, + "rx_invalid_nwid": None, + "rx_invalid_crypt": None, + "rx_invalid_frag": None, + "tx_excessive_retries": None, + "invalid_misc": None, + "missed_beacon": None + } + + interface_item: Dict = wireless_extension_obj.copy() + + re_interface = re.compile(r'^(?P[a-zA-Z0-9:._-]+)\s+(?P([a-zA-Z0-9]+\s)*[a-zA-Z0-9.]+)\s+ESSID:\"(?P[a-zA-Z0-9:._\s]+)\"') + re_mode = re.compile(r'Mode:(?P\w+)') + re_frequency = re.compile(r'Frequency:(?P[0-9.]+)\s(?P\w+)') + re_access_point = re.compile(r'Access Point:\s*(?P[0-9A-F:]+)') + re_bit_rate = re.compile(r'Bit Rate=(?P[0-9.]+)\s(?P[\w\/]+)') + re_tx_power= re.compile(r'Tx-Power=(?P[-0-9]+)\s(?P[\w]+)') + re_retry_short_limit = re.compile(r'Retry short limit:(?P[0-9\/]+)') + re_rts_threshold = re.compile(r'RTS thr:(?P(off|on))') + re_fragment_threshold = re.compile(r'Fragment thr:(?P(off|on))') + re_power_management = re.compile(r'Power Management:(?P(off|on))') + re_link_quality = re.compile(r'Link Quality=(?P[0-9\/]+)') + re_signal_level = re.compile(r'Signal level=(?P[-0-9]+)\s(?P[\w]+)') + re_rx_invalid_nwid = re.compile(r'Rx invalid nwid:(?P[-0-9]+)') + re_rx_invalid_crypt = re.compile(r'Rx invalid crypt:(?P[-0-9]+)') + re_rx_invalid_frag = re.compile(r'Rx invalid frag:(?P[-0-9]+)') + re_tx_excessive_retries = re.compile(r'Tx excessive retries:(?P[-0-9]+)') + re_invalid_misc = re.compile(r'Invalid misc:(?P[0-9]+)') + re_missed_beacon = re.compile(r'Missed beacon:(?P[0-9]+)') + + re_all = [ + re_mode, re_frequency, re_access_point, re_bit_rate, re_tx_power, + re_retry_short_limit, re_rts_threshold, re_fragment_threshold, re_power_management, + re_link_quality, re_signal_level, re_rx_invalid_nwid, re_rx_invalid_crypt, + re_rx_invalid_frag, re_tx_excessive_retries, re_invalid_misc, re_missed_beacon + ] + + if jc.utils.has_data(data): + for line in filter(None, data.splitlines()): + + # Find new interface lines + interface_match = re.search(re_interface, line) + if interface_match: + if interface_item['name'] is not None: + raw_output.append(interface_item) + interface_item = wireless_extension_obj.copy() + + interface_item.update(interface_match.groupdict()) + continue + + # we do not have any interface yet continue to search for it --> next line + if interface_item['name'] is None: + continue + + # Filling interface with whatever we can find + for re_entry in re_all: + match = re.search(re_entry, line) + if match: + interface_item.update(match.groupdict()) + + if interface_item['name'] is not None: + raw_output.append(interface_item) + + return raw_output if raw else _process(raw_output) diff --git a/tests/fixtures/ubuntu-20.10/iwconfig.json b/tests/fixtures/ubuntu-20.10/iwconfig.json new file mode 100644 index 00000000..e175a143 --- /dev/null +++ b/tests/fixtures/ubuntu-20.10/iwconfig.json @@ -0,0 +1,29 @@ +[ + { + "name": "wlp5s0", + "protocol": "IEEE 802.11", + "essid": "BLABLABLA", + "mode": "Managed", + "frequency": "5.18", + "frequency_unit": "GHz", + "access_point": "E6:63:DA:16:50:BF", + "bit_rate": "6", + "tx_power": "30", + "tx_power_unit": "dBm", + "retry_short_limit": "7", + "rts_threshold": "off", + "fragment_threshold": "off", + "poser_management": null, + "link_quality": null, + "signal_level": "-52", + "rx_invalid_nwid": "0", + "rx_invalid_crypt": "0", + "rx_invalid_frag": "0", + "tx_excessive_retries": "0", + "invalid_misc": "1766", + "missed_beacon": "0", + "bit_rate_unit": "Mb/s", + "power_management": "58/70", + "signal_level_unit": "dBm" + } +] \ No newline at end of file diff --git a/tests/fixtures/ubuntu-20.10/iwconfig.out b/tests/fixtures/ubuntu-20.10/iwconfig.out new file mode 100644 index 00000000..eaec4add --- /dev/null +++ b/tests/fixtures/ubuntu-20.10/iwconfig.out @@ -0,0 +1,9 @@ +wlp5s0 IEEE 802.11 ESSID:"BLABLABLA" + Mode:Managed Frequency:5.18 GHz Access Point: E6:63:DA:16:50:BF + Bit Rate=6 Mb/s Tx-Power=30 dBm + Retry short limit:7 RTS thr:off Fragment thr:off + Power Management:on + Link Quality=58/70 Signal level=-52 dBm + Rx invalid nwid:0 Rx invalid crypt:0 Rx invalid frag:0 + Tx excessive retries:0 Invalid misc:1766 Missed beacon:0 + diff --git a/tests/test_iwconfig.py b/tests/test_iwconfig.py new file mode 100644 index 00000000..2bdbae62 --- /dev/null +++ b/tests/test_iwconfig.py @@ -0,0 +1,33 @@ +import os +import json +import unittest +import jc.parsers.iwconfig + +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class iwconfigTests(unittest.TestCase): + + # input + with open(os.path.join(THIS_DIR, 'fixtures/ubuntu-20.10/iwconfig.out'), 'r', encoding='utf-8') as f: + ubuntu_20_10_iwconfig= f.read() + + # output + with open(os.path.join(THIS_DIR, 'fixtures/ubuntu-20.10/iwconfig.json'), 'r', encoding='utf-8') as f: + ubuntu_20_10_iwconfig_json= json.loads(f.read()) + + def test_iwconfig_nodata(self): + """ + Test 'iwconfig' with no data + """ + self.assertEqual(jc.parsers.iwconfig.parse('', quiet=True), []) + + def test_iwconfig_ubuntu_20_04(self): + """ + Test 'iwconfig' on Ubuntu 20.10 + """ + self.assertEqual(jc.parsers.iwconfig.parse(self.ubuntu_20_10_iwconfig, quiet=True), self.ubuntu_20_10_iwconfig_json) + + +if __name__ == '__main__': + unittest.main()