diff --git a/jc/lib.py b/jc/lib.py index ab0a544c..ca02f5e1 100644 --- a/jc/lib.py +++ b/jc/lib.py @@ -65,6 +65,7 @@ parsers: List[str] = [ 'iptables', 'iso-datetime', 'iw-scan', + 'iwconfig', 'jar-manifest', 'jobs', 'jwt', diff --git a/jc/parsers/iwconfig.py b/jc/parsers/iwconfig.py new file mode 100644 index 00000000..138f7d8f --- /dev/null +++ b/jc/parsers/iwconfig.py @@ -0,0 +1,202 @@ +"""jc - JSON Convert `iwconfig` command output parser + +No `iwconfig` options are supported. + +Usage (cli): + + $ iwconfig | jc --iwconfig + +or + + $ jc iwconfig + +Usage (module): + + import jc + result = jc.parse('iwconfig', iwconfig_command_output) + +Schema: + + [ + { + "name": string, + "protocol": string, + "essid": string, + "mode": string, + "frequency": float, + "frequency_unit": string, + "access_point": string, + "bit_rate": float, + "bit_rate_unit": string, + "tx_power": integer, + "tx_power_unit": string, + "retry_short_limit": integer, + "rts_threshold": boolean, + "fragment_threshold": boolean, + "power_management": boolean, + "link_quality": string, + "signal_level": integer, + "signal_level_unit": string, + "rx_invalid_nwid": integer, + "rx_invalid_crypt": integer, + "rx_invalid_frag": integer, + "tx_excessive_retries": integer, + "invalid_misc": integer, + "missed_beacon": integer + } + ] + + +Examples: + + $ iwconfig 2> /dev/null | jc --iwconfig -p + [ + { + "name": "wlp5s0", + "protocol": "IEEE 802.11", + "essid": "BLABLABLA", + "mode": "Managed", + "frequency": 5.18, + "frequency_unit": "GHz", + "access_point": "E6:64:DA:16:51:BF", + "bit_rate": 6.0, + "bit_rate_unit": "Mb/s", + "tx_power": 30, + "tx_power_unit": "dBm", + "retry_short_limit": 7, + "rts_threshold": false, + "fragment_threshold": false, + "power_management": true, + "link_quality": "61/70", + "signal_level": -49, + "signal_level_unit": "dBm", + "rx_invalid_nwid": 0, + "rx_invalid_crypt": 0, + "rx_invalid_frag": 0, + "tx_excessive_retries": 0, + "invalid_misc": 2095, + "missed_beacon": 0 + } + ] + +""" +import re +from typing import List, Dict +from jc.jc_types import JSONDictType +import jc.utils + +class info(): + """Provides parser metadata (version, author, etc.)""" + version = '1.0' + description = '`iwconfig` command parser' + author = 'Thomas Vincent' + author_email = 'vrince@gmail.com' + compatible = ['linux'] + magic_commands = ['iwconfig'] + + +__version__ = info.version + + +def _process(proc_data: List[JSONDictType]) -> List[JSONDictType]: + """ + Final processing to conform to the schema. + + Parameters: + + proc_data: (List of Dictionaries) raw structured data to process + + Returns: + + List of Dictionaries. Structured to conform to the schema. + """ + int_list = ['signal_level', 'rx_invalid_nwid', 'rx_invalid_crypt', 'rx_invalid_frag', + 'tx_excessive_retries', 'invalid_misc', 'missed_beacon', 'tx_power', 'retry_short_limit'] + float_list = ['frequency', 'bit_rate'] + bool_list = ['rts_threshold', 'fragment_threshold', 'power_management'] + + + proc_data = [ { key: int(value) if key in int_list else value for key, value in proc_data_item.items() } for proc_data_item in proc_data ] + proc_data = [ { key: float(value) if key in float_list else value for key, value in proc_data_item.items() } for proc_data_item in proc_data ] + proc_data = [ { key: value == 'on' if key in bool_list else value for key, value in proc_data_item .items() } for proc_data_item in proc_data ] + + return proc_data + +def parse( + data: str, + raw: bool = False, + quiet: bool = False +) -> List[JSONDictType]: + """ + Main text parsing function + + Parameters: + + data: (string) text data to parse + raw: (boolean) unprocessed output if True + quiet: (boolean) suppress warning messages if True + + Returns: + + List of Dictionaries. Raw or processed structured data. + """ + jc.utils.compatibility(__name__, info.compatible, quiet) + jc.utils.input_type_check(data) + + raw_output: List[Dict] = [] + wireless_extension_obj: Dict = {} + + re_interface = re.compile(r'^(?P[a-zA-Z0-9:._-]+)\s+(?P([a-zA-Z0-9]+\s)*[a-zA-Z0-9.]+)\s+ESSID:\"(?P[a-zA-Z0-9:._\s]+)\"') + re_mode = re.compile(r'Mode:(?P\w+)') + re_frequency = re.compile(r'Frequency:(?P[0-9.]+)\s(?P\w+)') + re_access_point = re.compile(r'Access Point:\s*(?P[0-9A-F:]+)') + re_bit_rate = re.compile(r'Bit Rate=(?P[0-9.]+)\s(?P[\w\/]+)') + re_tx_power= re.compile(r'Tx-Power=(?P[-0-9]+)\s(?P[\w]+)') + re_retry_short_limit = re.compile(r'Retry short limit:(?P[0-9\/]+)') + re_rts_threshold = re.compile(r'RTS thr:(?P(off|on))') + re_fragment_threshold = re.compile(r'Fragment thr:(?P(off|on))') + re_power_management = re.compile(r'Power Management:(?P(off|on))') + re_link_quality = re.compile(r'Link Quality=(?P[0-9\/]+)') + re_signal_level = re.compile(r'Signal level=(?P[-0-9]+)\s(?P[\w]+)') + re_rx_invalid_nwid = re.compile(r'Rx invalid nwid:(?P[-0-9]+)') + re_rx_invalid_crypt = re.compile(r'Rx invalid crypt:(?P[-0-9]+)') + re_rx_invalid_frag = re.compile(r'Rx invalid frag:(?P[-0-9]+)') + re_tx_excessive_retries = re.compile(r'Tx excessive retries:(?P[-0-9]+)') + re_invalid_misc = re.compile(r'Invalid misc:(?P[0-9]+)') + re_missed_beacon = re.compile(r'Missed beacon:(?P[0-9]+)') + + re_all = [ + re_mode, re_frequency, re_access_point, re_bit_rate, re_tx_power, + re_retry_short_limit, re_rts_threshold, re_fragment_threshold, re_power_management, + re_link_quality, re_signal_level, re_rx_invalid_nwid, re_rx_invalid_crypt, + re_rx_invalid_frag, re_tx_excessive_retries, re_invalid_misc, re_missed_beacon + ] + + interface_item = None + if jc.utils.has_data(data): + for line in filter(None, data.splitlines()): + + # Find new interface lines + interface_match = re.search(re_interface, line) + if interface_match: + if interface_item is not None: + raw_output.append(interface_item) + + interface_item = dict() + interface_item.update(interface_match.groupdict()) + continue + + # we do not have any interface yet continue to search for it --> next line + if interface_item is None: + continue + + # Filling interface with whatever we can find + for re_entry in re_all: + match = re.search(re_entry, line) + if match: + interface_item.update(match.groupdict()) + + if interface_item is not None: + raw_output.append(interface_item) + + return raw_output if raw else _process(raw_output) diff --git a/tests/fixtures/generic/iwconfig-many.json b/tests/fixtures/generic/iwconfig-many.json new file mode 100644 index 00000000..aae19364 --- /dev/null +++ b/tests/fixtures/generic/iwconfig-many.json @@ -0,0 +1,54 @@ +[ + { + "name": "wlp5s0", + "protocol": "IEEE 802.11", + "essid": "BLABLABLA", + "mode": "Managed", + "frequency": 5.18, + "frequency_unit": "GHz", + "access_point": "E6:63:DA:16:50:BF", + "bit_rate": 6.0, + "bit_rate_unit": "Mb/s", + "tx_power": 30, + "tx_power_unit": "dBm", + "retry_short_limit": 7, + "rts_threshold": false, + "fragment_threshold": false, + "power_management": true, + "link_quality": "58/70", + "signal_level": -52, + "signal_level_unit": "dBm", + "rx_invalid_nwid": 0, + "rx_invalid_crypt": 0, + "rx_invalid_frag": 0, + "tx_excessive_retries": 0, + "invalid_misc": 1766, + "missed_beacon": 0 + }, + { + "name": "wlp5s02", + "protocol": "IEEE 802.11", + "essid": "BLABLABLA2", + "mode": "Managed", + "frequency": 5.18, + "frequency_unit": "GHz", + "access_point": "E6:63:DA:16:50:BF", + "bit_rate": 6.0, + "bit_rate_unit": "Mb/s", + "tx_power": 30, + "tx_power_unit": "dBm", + "retry_short_limit": 7, + "rts_threshold": false, + "fragment_threshold": false, + "power_management": true, + "link_quality": "58/70", + "signal_level": -53, + "signal_level_unit": "dBm", + "rx_invalid_nwid": 41, + "rx_invalid_crypt": 42, + "rx_invalid_frag": 43, + "tx_excessive_retries": 44, + "invalid_misc": 1766, + "missed_beacon": 0 + } +] \ No newline at end of file diff --git a/tests/fixtures/generic/iwconfig-many.out b/tests/fixtures/generic/iwconfig-many.out new file mode 100644 index 00000000..b4bf15cc --- /dev/null +++ b/tests/fixtures/generic/iwconfig-many.out @@ -0,0 +1,17 @@ +wlp5s0 IEEE 802.11 ESSID:"BLABLABLA" + Mode:Managed Frequency:5.18 GHz Access Point: E6:63:DA:16:50:BF + Bit Rate=6 Mb/s Tx-Power=30 dBm + Retry short limit:7 RTS thr:off Fragment thr:off + Power Management:on + Link Quality=58/70 Signal level=-52 dBm + Rx invalid nwid:0 Rx invalid crypt:0 Rx invalid frag:0 + Tx excessive retries:0 Invalid misc:1766 Missed beacon:0 + +wlp5s02 IEEE 802.11 ESSID:"BLABLABLA2" + Mode:Managed Frequency:5.18 GHz Access Point: E6:63:DA:16:50:BF + Bit Rate=6 Mb/s Tx-Power=30 dBm + Retry short limit:7 RTS thr:off Fragment thr:off + Power Management:on + Link Quality=58/70 Signal level=-53 dBm + Rx invalid nwid:41 Rx invalid crypt:42 Rx invalid frag:43 + Tx excessive retries:44 Invalid misc:1766 Missed beacon:0 diff --git a/tests/fixtures/generic/iwconfig-raw.json b/tests/fixtures/generic/iwconfig-raw.json new file mode 100644 index 00000000..6f824ed2 --- /dev/null +++ b/tests/fixtures/generic/iwconfig-raw.json @@ -0,0 +1,28 @@ +[ + { + "name": "wlp5s0", + "protocol": "IEEE 802.11", + "essid": "BLABLABLA", + "mode": "Managed", + "frequency": "5.18", + "frequency_unit": "GHz", + "access_point": "E6:63:DA:16:50:BF", + "bit_rate": "6", + "bit_rate_unit": "Mb/s", + "tx_power": "30", + "tx_power_unit": "dBm", + "retry_short_limit": "7", + "rts_threshold": "off", + "fragment_threshold": "off", + "power_management": "on", + "link_quality": "58/70", + "signal_level": "-52", + "signal_level_unit": "dBm", + "rx_invalid_nwid": "0", + "rx_invalid_crypt": "0", + "rx_invalid_frag": "0", + "tx_excessive_retries": "0", + "invalid_misc": "1766", + "missed_beacon": "0" + } +] \ No newline at end of file diff --git a/tests/fixtures/generic/iwconfig.json b/tests/fixtures/generic/iwconfig.json new file mode 100644 index 00000000..511884e3 --- /dev/null +++ b/tests/fixtures/generic/iwconfig.json @@ -0,0 +1,28 @@ +[ + { + "name": "wlp5s0", + "protocol": "IEEE 802.11", + "essid": "BLABLABLA", + "mode": "Managed", + "frequency": 5.18, + "frequency_unit": "GHz", + "access_point": "E6:63:DA:16:50:BF", + "bit_rate": 6.0, + "bit_rate_unit": "Mb/s", + "tx_power": 30, + "tx_power_unit": "dBm", + "retry_short_limit": 7, + "rts_threshold": false, + "fragment_threshold": false, + "power_management": true, + "link_quality": "58/70", + "signal_level": -52, + "signal_level_unit": "dBm", + "rx_invalid_nwid": 0, + "rx_invalid_crypt": 0, + "rx_invalid_frag": 0, + "tx_excessive_retries": 0, + "invalid_misc": 1766, + "missed_beacon": 0 + } +] \ No newline at end of file diff --git a/tests/fixtures/generic/iwconfig.out b/tests/fixtures/generic/iwconfig.out new file mode 100644 index 00000000..eaec4add --- /dev/null +++ b/tests/fixtures/generic/iwconfig.out @@ -0,0 +1,9 @@ +wlp5s0 IEEE 802.11 ESSID:"BLABLABLA" + Mode:Managed Frequency:5.18 GHz Access Point: E6:63:DA:16:50:BF + Bit Rate=6 Mb/s Tx-Power=30 dBm + Retry short limit:7 RTS thr:off Fragment thr:off + Power Management:on + Link Quality=58/70 Signal level=-52 dBm + Rx invalid nwid:0 Rx invalid crypt:0 Rx invalid frag:0 + Tx excessive retries:0 Invalid misc:1766 Missed beacon:0 + diff --git a/tests/test_iwconfig.py b/tests/test_iwconfig.py new file mode 100644 index 00000000..b3b9fee0 --- /dev/null +++ b/tests/test_iwconfig.py @@ -0,0 +1,54 @@ +import os +import json +import unittest +import jc.parsers.iwconfig + +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class iwconfigTests(unittest.TestCase): + + # input + with open(os.path.join(THIS_DIR, 'fixtures/generic/iwconfig.out'), 'r', encoding='utf-8') as f: + iwconfig_output = f.read() + + with open(os.path.join(THIS_DIR, 'fixtures/generic/iwconfig-many.out'), 'r', encoding='utf-8') as f: + iwconfig_many_output = f.read() + + # output + with open(os.path.join(THIS_DIR, 'fixtures/generic/iwconfig.json'), 'r', encoding='utf-8') as f: + iwconfig_json = json.loads(f.read()) + + with open(os.path.join(THIS_DIR, 'fixtures/generic/iwconfig-raw.json'), 'r', encoding='utf-8') as f: + iwconfig_raw_json = json.loads(f.read()) + + with open(os.path.join(THIS_DIR, 'fixtures/generic/iwconfig-many.json'), 'r', encoding='utf-8') as f: + iwconfig_many_json = json.loads(f.read()) + + + def test_iwconfig_nodata(self): + """ + Test 'iwconfig' with no data + """ + self.assertEqual(jc.parsers.iwconfig.parse('', quiet=True), []) + + def test_iwconfig_raw(self): + """ + Test 'iwconfig' raw + """ + self.assertEqual(jc.parsers.iwconfig.parse(self.iwconfig_output, quiet=True, raw=True), self.iwconfig_raw_json) + + def test_iwconfig(self): + """ + Test 'iwconfig' + """ + self.assertEqual(jc.parsers.iwconfig.parse(self.iwconfig_output, quiet=True), self.iwconfig_json) + + def test_iwconfig_many(self): + """ + Test 'iwconfig' many interface + """ + self.assertEqual(jc.parsers.iwconfig.parse(self.iwconfig_many_output, quiet=True), self.iwconfig_many_json) + +if __name__ == '__main__': + unittest.main()