mirror of
https://github.com/kellyjonbrazil/jc.git
synced 2025-06-17 00:07:37 +02:00
add new iwconfig parser
This commit is contained in:
170
jc/parsers/iwconfig.py
Normal file
170
jc/parsers/iwconfig.py
Normal file
@ -0,0 +1,170 @@
|
||||
"""jc - JSON Convert `iwconfig` command output parser
|
||||
|
||||
No `iwconfig` options are supported.
|
||||
|
||||
Usage (cli):
|
||||
|
||||
$ iwconfig | jc --iwconfig
|
||||
|
||||
or
|
||||
|
||||
$ jc iwconfig
|
||||
|
||||
Usage (module):
|
||||
|
||||
import jc
|
||||
result = jc.parse('iwconfig', iwconfig_command_output)
|
||||
|
||||
Schema:
|
||||
|
||||
[
|
||||
]
|
||||
|
||||
|
||||
Examples:
|
||||
|
||||
$ iwconfig | jc --ifconfig -p
|
||||
|
||||
"""
|
||||
import re
|
||||
from typing import List, Dict
|
||||
from jc.jc_types import JSONDictType
|
||||
import jc.utils
|
||||
|
||||
class info():
|
||||
"""Provides parser metadata (version, author, etc.)"""
|
||||
version = '1.0'
|
||||
description = '`iwconfig` command parser'
|
||||
author = 'Thomas Vincent'
|
||||
author_email = 'vrince@gmail.com'
|
||||
compatible = ['linux']
|
||||
magic_commands = ['iwconfig']
|
||||
|
||||
|
||||
__version__ = info.version
|
||||
|
||||
|
||||
def _process(proc_data: List[JSONDictType]) -> List[JSONDictType]:
|
||||
"""
|
||||
Final processing to conform to the schema.
|
||||
|
||||
Parameters:
|
||||
|
||||
proc_data: (List of Dictionaries) raw structured data to process
|
||||
|
||||
Returns:
|
||||
|
||||
List of Dictionaries. Structured to conform to the schema.
|
||||
"""
|
||||
int_list = {
|
||||
'flags', 'mtu', 'ipv6_mask', 'rx_packets', 'rx_bytes', 'rx_errors', 'rx_dropped',
|
||||
'rx_overruns', 'rx_frame', 'tx_packets', 'tx_bytes', 'tx_errors', 'tx_dropped',
|
||||
'tx_overruns', 'tx_carrier', 'tx_collisions', 'metric', 'nd6_options', 'lane'
|
||||
}
|
||||
float_list = {'rx_power_mw', 'rx_power_dbm', 'tx_bias_ma'}
|
||||
return proc_data
|
||||
|
||||
def parse(
|
||||
data: str,
|
||||
raw: bool = False,
|
||||
quiet: bool = False
|
||||
) -> List[JSONDictType]:
|
||||
"""
|
||||
Main text parsing function
|
||||
|
||||
Parameters:
|
||||
|
||||
data: (string) text data to parse
|
||||
raw: (boolean) unprocessed output if True
|
||||
quiet: (boolean) suppress warning messages if True
|
||||
|
||||
Returns:
|
||||
|
||||
List of Dictionaries. Raw or processed structured data.
|
||||
"""
|
||||
jc.utils.compatibility(__name__, info.compatible, quiet)
|
||||
jc.utils.input_type_check(data)
|
||||
|
||||
raw_output: List[Dict] = []
|
||||
|
||||
# for backwards compatibility, preset all fields to None
|
||||
wireless_extension_obj: Dict = {
|
||||
"name": None,
|
||||
"protocol": None,
|
||||
"essid": None,
|
||||
"mode": None,
|
||||
"frequency": None,
|
||||
"frequency_unit": None,
|
||||
"access_point": None,
|
||||
"bit_rate": None,
|
||||
"tx_power": None,
|
||||
"tx_power_unit": None,
|
||||
"retry_short_limit": None,
|
||||
"rts_threshold": None,
|
||||
"fragment_threshold": None,
|
||||
"poser_management": None,
|
||||
"link_quality": None,
|
||||
"signal_level": None,
|
||||
"rx_invalid_nwid": None,
|
||||
"rx_invalid_crypt": None,
|
||||
"rx_invalid_frag": None,
|
||||
"tx_excessive_retries": None,
|
||||
"invalid_misc": None,
|
||||
"missed_beacon": None
|
||||
}
|
||||
|
||||
interface_item: Dict = wireless_extension_obj.copy()
|
||||
|
||||
re_interface = re.compile(r'^(?P<name>[a-zA-Z0-9:._-]+)\s+(?P<protocol>([a-zA-Z0-9]+\s)*[a-zA-Z0-9.]+)\s+ESSID:\"(?P<essid>[a-zA-Z0-9:._\s]+)\"')
|
||||
re_mode = re.compile(r'Mode:(?P<mode>\w+)')
|
||||
re_frequency = re.compile(r'Frequency:(?P<frequency>[0-9.]+)\s(?P<frequency_unit>\w+)')
|
||||
re_access_point = re.compile(r'Access Point:\s*(?P<access_point>[0-9A-F:]+)')
|
||||
re_bit_rate = re.compile(r'Bit Rate=(?P<bit_rate>[0-9.]+)\s(?P<bit_rate_unit>[\w\/]+)')
|
||||
re_tx_power= re.compile(r'Tx-Power=(?P<tx_power>[-0-9]+)\s(?P<tx_power_unit>[\w]+)')
|
||||
re_retry_short_limit = re.compile(r'Retry short limit:(?P<retry_short_limit>[0-9\/]+)')
|
||||
re_rts_threshold = re.compile(r'RTS thr:(?P<rts_threshold>(off|on))')
|
||||
re_fragment_threshold = re.compile(r'Fragment thr:(?P<fragment_threshold>(off|on))')
|
||||
re_power_management = re.compile(r'Power Management:(?P<power_management>(off|on))')
|
||||
re_link_quality = re.compile(r'Link Quality=(?P<power_management>[0-9\/]+)')
|
||||
re_signal_level = re.compile(r'Signal level=(?P<signal_level>[-0-9]+)\s(?P<signal_level_unit>[\w]+)')
|
||||
re_rx_invalid_nwid = re.compile(r'Rx invalid nwid:(?P<rx_invalid_nwid>[-0-9]+)')
|
||||
re_rx_invalid_crypt = re.compile(r'Rx invalid crypt:(?P<rx_invalid_crypt>[-0-9]+)')
|
||||
re_rx_invalid_frag = re.compile(r'Rx invalid frag:(?P<rx_invalid_frag>[-0-9]+)')
|
||||
re_tx_excessive_retries = re.compile(r'Tx excessive retries:(?P<tx_excessive_retries>[-0-9]+)')
|
||||
re_invalid_misc = re.compile(r'Invalid misc:(?P<invalid_misc>[0-9]+)')
|
||||
re_missed_beacon = re.compile(r'Missed beacon:(?P<missed_beacon>[0-9]+)')
|
||||
|
||||
re_all = [
|
||||
re_mode, re_frequency, re_access_point, re_bit_rate, re_tx_power,
|
||||
re_retry_short_limit, re_rts_threshold, re_fragment_threshold, re_power_management,
|
||||
re_link_quality, re_signal_level, re_rx_invalid_nwid, re_rx_invalid_crypt,
|
||||
re_rx_invalid_frag, re_tx_excessive_retries, re_invalid_misc, re_missed_beacon
|
||||
]
|
||||
|
||||
if jc.utils.has_data(data):
|
||||
for line in filter(None, data.splitlines()):
|
||||
|
||||
# Find new interface lines
|
||||
interface_match = re.search(re_interface, line)
|
||||
if interface_match:
|
||||
if interface_item['name'] is not None:
|
||||
raw_output.append(interface_item)
|
||||
interface_item = wireless_extension_obj.copy()
|
||||
|
||||
interface_item.update(interface_match.groupdict())
|
||||
continue
|
||||
|
||||
# we do not have any interface yet continue to search for it --> next line
|
||||
if interface_item['name'] is None:
|
||||
continue
|
||||
|
||||
# Filling interface with whatever we can find
|
||||
for re_entry in re_all:
|
||||
match = re.search(re_entry, line)
|
||||
if match:
|
||||
interface_item.update(match.groupdict())
|
||||
|
||||
if interface_item['name'] is not None:
|
||||
raw_output.append(interface_item)
|
||||
|
||||
return raw_output if raw else _process(raw_output)
|
29
tests/fixtures/ubuntu-20.10/iwconfig.json
vendored
Normal file
29
tests/fixtures/ubuntu-20.10/iwconfig.json
vendored
Normal file
@ -0,0 +1,29 @@
|
||||
[
|
||||
{
|
||||
"name": "wlp5s0",
|
||||
"protocol": "IEEE 802.11",
|
||||
"essid": "BLABLABLA",
|
||||
"mode": "Managed",
|
||||
"frequency": "5.18",
|
||||
"frequency_unit": "GHz",
|
||||
"access_point": "E6:63:DA:16:50:BF",
|
||||
"bit_rate": "6",
|
||||
"tx_power": "30",
|
||||
"tx_power_unit": "dBm",
|
||||
"retry_short_limit": "7",
|
||||
"rts_threshold": "off",
|
||||
"fragment_threshold": "off",
|
||||
"poser_management": null,
|
||||
"link_quality": null,
|
||||
"signal_level": "-52",
|
||||
"rx_invalid_nwid": "0",
|
||||
"rx_invalid_crypt": "0",
|
||||
"rx_invalid_frag": "0",
|
||||
"tx_excessive_retries": "0",
|
||||
"invalid_misc": "1766",
|
||||
"missed_beacon": "0",
|
||||
"bit_rate_unit": "Mb/s",
|
||||
"power_management": "58/70",
|
||||
"signal_level_unit": "dBm"
|
||||
}
|
||||
]
|
9
tests/fixtures/ubuntu-20.10/iwconfig.out
vendored
Normal file
9
tests/fixtures/ubuntu-20.10/iwconfig.out
vendored
Normal file
@ -0,0 +1,9 @@
|
||||
wlp5s0 IEEE 802.11 ESSID:"BLABLABLA"
|
||||
Mode:Managed Frequency:5.18 GHz Access Point: E6:63:DA:16:50:BF
|
||||
Bit Rate=6 Mb/s Tx-Power=30 dBm
|
||||
Retry short limit:7 RTS thr:off Fragment thr:off
|
||||
Power Management:on
|
||||
Link Quality=58/70 Signal level=-52 dBm
|
||||
Rx invalid nwid:0 Rx invalid crypt:0 Rx invalid frag:0
|
||||
Tx excessive retries:0 Invalid misc:1766 Missed beacon:0
|
||||
|
33
tests/test_iwconfig.py
Normal file
33
tests/test_iwconfig.py
Normal file
@ -0,0 +1,33 @@
|
||||
import os
|
||||
import json
|
||||
import unittest
|
||||
import jc.parsers.iwconfig
|
||||
|
||||
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
class iwconfigTests(unittest.TestCase):
|
||||
|
||||
# input
|
||||
with open(os.path.join(THIS_DIR, 'fixtures/ubuntu-20.10/iwconfig.out'), 'r', encoding='utf-8') as f:
|
||||
ubuntu_20_10_iwconfig= f.read()
|
||||
|
||||
# output
|
||||
with open(os.path.join(THIS_DIR, 'fixtures/ubuntu-20.10/iwconfig.json'), 'r', encoding='utf-8') as f:
|
||||
ubuntu_20_10_iwconfig_json= json.loads(f.read())
|
||||
|
||||
def test_iwconfig_nodata(self):
|
||||
"""
|
||||
Test 'iwconfig' with no data
|
||||
"""
|
||||
self.assertEqual(jc.parsers.iwconfig.parse('', quiet=True), [])
|
||||
|
||||
def test_iwconfig_ubuntu_20_04(self):
|
||||
"""
|
||||
Test 'iwconfig' on Ubuntu 20.10
|
||||
"""
|
||||
self.assertEqual(jc.parsers.iwconfig.parse(self.ubuntu_20_10_iwconfig, quiet=True), self.ubuntu_20_10_iwconfig_json)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in New Issue
Block a user