From e46ac0ff7ed678114c94aa26325a22a2c72fe002 Mon Sep 17 00:00:00 2001 From: Kelly Brazil Date: Sat, 5 Nov 2022 12:13:01 -0700 Subject: [PATCH] initial commit of new ifconfig parser --- jc/lib.py | 1 + jc/parsers/ifconfig.py | 618 ++++++++++++++-------------------------- jc/parsers/ifconfig2.py | 506 ++++++++++++++++++++++++++++++++ tests/test_ifconfig.py | 6 + 4 files changed, 726 insertions(+), 405 deletions(-) create mode 100644 jc/parsers/ifconfig2.py diff --git a/jc/lib.py b/jc/lib.py index cee17ce1..c33e1245 100644 --- a/jc/lib.py +++ b/jc/lib.py @@ -54,6 +54,7 @@ parsers: List[str] = [ 'hosts', 'id', 'ifconfig', + 'ifconfig2', 'ini', 'iostat', 'iostat-s', diff --git a/jc/parsers/ifconfig.py b/jc/parsers/ifconfig.py index 94fa0096..f19d2210 100644 --- a/jc/parsers/ifconfig.py +++ b/jc/parsers/ifconfig.py @@ -1,201 +1,47 @@ -"""jc - JSON Convert `ifconfig` command output parser +"""jc - JSON Convert `foo` command output parser -No `ifconfig` options are supported. - -Consider using the `ip` command instead of `ifconfig` as it supports native -JSON output and provides more detailed output than the `ifconfig` parser. -(e.g. support for multiple IPv4 and IPv6 addresses.) - -> Note: This parser will only output the last IPv4 and IPv6 address for -> each interface in the command output. +<> Usage (cli): - $ ifconfig | jc --ifconfig + $ foo | jc --foo or - $ jc ifconfig + $ jc foo Usage (module): import jc - result = jc.parse('ifconfig', ifconfig_command_output) + result = jc.parse('foo', foo_command_output) Schema: [ { - "name": string, - "flags": integer, - "state": [ - string - ], - "mtu": integer, - "ipv4_addr": string, - "ipv4_mask": string, - "ipv4_bcast": string, - "ipv6_addr": string, - "ipv6_mask": integer, - "ipv6_scope": string, - "mac_addr": string, - "type": string, - "rx_packets": integer, - "rx_bytes": integer, - "rx_errors": integer, - "rx_dropped": integer, - "rx_overruns": integer, - "rx_frame": integer, - "tx_packets": integer, - "tx_bytes": integer, - "tx_errors": integer, - "tx_dropped": integer, - "tx_overruns": integer, - "tx_carrier": integer, - "tx_collisions": integer, - "metric": integer + "foo": string, + "bar": boolean, + "baz": integer } ] Examples: - $ ifconfig | jc --ifconfig -p - [ - { - "name": "ens33", - "flags": 4163, - "state": [ - "UP", - "BROADCAST", - "RUNNING", - "MULTICAST" - ], - "mtu": 1500, - "ipv4_addr": "192.168.71.137", - "ipv4_mask": "255.255.255.0", - "ipv4_bcast": "192.168.71.255", - "ipv6_addr": "fe80::c1cb:715d:bc3e:b8a0", - "ipv6_mask": 64, - "ipv6_scope": "0x20", - "mac_addr": "00:0c:29:3b:58:0e", - "type": "Ethernet", - "rx_packets": 8061, - "rx_bytes": 1514413, - "rx_errors": 0, - "rx_dropped": 0, - "rx_overruns": 0, - "rx_frame": 0, - "tx_packets": 4502, - "tx_bytes": 866622, - "tx_errors": 0, - "tx_dropped": 0, - "tx_overruns": 0, - "tx_carrier": 0, - "tx_collisions": 0, - "metric": null - }, - { - "name": "lo", - "flags": 73, - "state": [ - "UP", - "LOOPBACK", - "RUNNING" - ], - "mtu": 65536, - "ipv4_addr": "127.0.0.1", - "ipv4_mask": "255.0.0.0", - "ipv4_bcast": null, - "ipv6_addr": "::1", - "ipv6_mask": 128, - "ipv6_scope": "0x10", - "mac_addr": null, - "type": "Local Loopback", - "rx_packets": 73, - "rx_bytes": 6009, - "rx_errors": 0, - "rx_dropped": 0, - "rx_overruns": 0, - "rx_frame": 0, - "tx_packets": 73, - "tx_bytes": 6009, - "tx_errors": 0, - "tx_dropped": 0, - "tx_overruns": 0, - "tx_carrier": 0, - "tx_collisions": 0, - "metric": null - } - ] + $ foo | jc --foo -p + [] - $ ifconfig | jc --ifconfig -p -r - [ - { - "name": "ens33", - "flags": "4163", - "state": "UP,BROADCAST,RUNNING,MULTICAST", - "mtu": "1500", - "ipv4_addr": "192.168.71.137", - "ipv4_mask": "255.255.255.0", - "ipv4_bcast": "192.168.71.255", - "ipv6_addr": "fe80::c1cb:715d:bc3e:b8a0", - "ipv6_mask": "64", - "ipv6_scope": "0x20", - "mac_addr": "00:0c:29:3b:58:0e", - "type": "Ethernet", - "rx_packets": "8061", - "rx_bytes": "1514413", - "rx_errors": "0", - "rx_dropped": "0", - "rx_overruns": "0", - "rx_frame": "0", - "tx_packets": "4502", - "tx_bytes": "866622", - "tx_errors": "0", - "tx_dropped": "0", - "tx_overruns": "0", - "tx_carrier": "0", - "tx_collisions": "0", - "metric": null - }, - { - "name": "lo", - "flags": "73", - "state": "UP,LOOPBACK,RUNNING", - "mtu": "65536", - "ipv4_addr": "127.0.0.1", - "ipv4_mask": "255.0.0.0", - "ipv4_bcast": null, - "ipv6_addr": "::1", - "ipv6_mask": "128", - "ipv6_scope": "0x10", - "mac_addr": null, - "type": "Local Loopback", - "rx_packets": "73", - "rx_bytes": "6009", - "rx_errors": "0", - "rx_dropped": "0", - "rx_overruns": "0", - "rx_frame": "0", - "tx_packets": "73", - "tx_bytes": "6009", - "tx_errors": "0", - "tx_dropped": "0", - "tx_overruns": "0", - "tx_carrier": "0", - "tx_collisions": "0", - "metric": null - } - ] + $ foo | jc --foo -p -r + [] """ import re -from collections import namedtuple +from typing import List, Dict +from jc.jc_types import JSONDictType import jc.utils class info(): """Provides parser metadata (version, author, etc.)""" - version = '1.12' + version = '2.0' description = '`ifconfig` command parser' author = 'Kelly Brazil' author_email = 'kellyjonbrazil@gmail.com' @@ -207,225 +53,7 @@ class info(): __version__ = info.version -class _IfconfigParser(object): - """ifconfig parser module written by threeheadedknight@protonmail.com""" - # Author: threeheadedknight@protonmail.com - # Date created: 30.06.2018 17:03 - # Python Version: 3.7 - - # MIT License - - # Copyright (c) 2018 threeheadedknight@protonmail.com - - # Permission is hereby granted, free of charge, to any person obtaining a copy - # of this software and associated documentation files (the "Software"), to deal - # in the Software without restriction, including without limitation the rights - # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - # copies of the Software, and to permit persons to whom the Software is - # furnished to do so, subject to the following conditions: - - # The above copyright notice and this permission notice shall be included in all - # copies or substantial portions of the Software. - - # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - # SOFTWARE. - - attributes = [ - 'name', 'type', 'mac_addr', 'ipv4_addr', 'ipv4_bcast', 'ipv4_mask', 'ipv6_addr', - 'ipv6_mask', 'ipv6_scope', 'state', 'mtu', 'metric', 'rx_packets', 'rx_errors', - 'rx_dropped', 'rx_overruns', 'rx_frame', 'tx_packets', 'tx_errors', 'tx_dropped', - 'tx_overruns', 'tx_carrier', 'tx_collisions', 'rx_bytes', 'tx_bytes' - ] - - def __init__(self, console_output): - """ - :param console_output: - """ - - if isinstance(console_output, list): - source_data = " ".join(console_output) - else: - source_data = console_output.replace("\n", " ") - self.interfaces = self.parser(source_data=source_data) - - def list_interfaces(self): - """ - :return: - """ - return sorted(self.interfaces.keys()) - - def count_interfaces(self): - """ - :return: - """ - return len(self.interfaces.keys()) - - def filter_interfaces(self, **kwargs): - """ - :param kwargs: - :return: - """ - for attr in kwargs.keys(): - if attr not in _IfconfigParser.attributes: - raise ValueError("Attribute [{}] not supported.".format(attr)) - - filtered_interfaces = [] - for name, details in self.interfaces.items(): - - if all(getattr(details, attr) == kwargs[attr] for attr in kwargs.keys()): - filtered_interfaces.append(name) - - return sorted(filtered_interfaces) - - def get_interface(self, name): - """ - :param name: - :return: - """ - if name in self.list_interfaces(): - return self.interfaces[name] - else: - raise InterfaceNotFound("Interface [{}] not found.".format(name)) - - def get_interfaces(self): - """ - :return: - """ - return self.interfaces - - def is_available(self, name): - """ - :param name: - :return: - """ - return name in self.interfaces - - def parser(self, source_data): - """ - :param source_data: - :return: - """ - - # Linux syntax - re_linux_interface = re.compile( - r"(?P[a-zA-Z0-9:._-]+)\s+Link encap:(?P\S+\s?\S+)(\s+HWaddr\s+\b" - r"(?P[0-9A-Fa-f:?]+))?", - re.I) - re_linux_ipv4 = re.compile( - r"inet addr:(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})(\s+Bcast:" - r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?\s+Mask:(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})", - re.I) - re_linux_ipv6 = re.compile( - r"inet6 addr:\s+(?P\S+)/(?P[0-9]+)\s+Scope:(?PLink|Host)", - re.I) - re_linux_state = re.compile( - r"\W+(?P(?:\w+\s)+)(?:\s+)?MTU:(?P[0-9]+)\s+Metric:(?P[0-9]+)", re.I) - re_linux_rx = re.compile( - r"RX packets:(?P[0-9]+)\s+errors:(?P[0-9]+)\s+dropped:" - r"(?P[0-9]+)\s+overruns:(?P[0-9]+)\s+frame:(?P[0-9]+)", - re.I) - re_linux_tx = re.compile( - r"TX packets:(?P[0-9]+)\s+errors:(?P[0-9]+)\s+dropped:" - r"(?P[0-9]+)\s+overruns:(?P[0-9]+)\s+carrier:(?P[0-9]+)", - re.I) - re_linux_bytes = re.compile(r"\W+RX bytes:(?P\d+)\s+\(.*\)\s+TX bytes:(?P\d+)\s+\(.*\)", re.I) - re_linux_tx_stats = re.compile(r"collisions:(?P[0-9]+)\s+txqueuelen:[0-9]+", re.I) - re_linux = [re_linux_interface, re_linux_ipv4, re_linux_ipv6, re_linux_state, re_linux_rx, re_linux_tx, - re_linux_bytes, re_linux_tx_stats] - - # OpenBSD syntax - re_openbsd_interface = re.compile( - r"(?P[a-zA-Z0-9:._-]+):\s+flags=(?P[0-9]+)<(?P\S+)?>\s+mtu\s+(?P[0-9]+)", - re.I) - re_openbsd_ipv4 = re.compile( - r"inet (?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})\s+netmask\s+" - r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})(\s+broadcast\s+" - r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?", - re.I) - re_openbsd_ipv6 = re.compile( - r"inet6\s+(?P\S+)\s+prefixlen\s+(?P[0-9]+)\s+scopeid\s+(?P\w+x\w+)<" - r"(?:link|host)>", - re.I) - re_openbsd_details = re.compile( - r"\S+\s+(?:(?P[0-9A-Fa-f:?]+)\s+)?txqueuelen\s+[0-9]+\s+\((?P\S+\s?\S+)\)", re.I) - re_openbsd_rx = re.compile(r"RX packets (?P[0-9]+)\s+bytes\s+(?P\d+)\s+.*", re.I) - re_openbsd_rx_stats = re.compile( - r"RX errors (?P[0-9]+)\s+dropped\s+(?P[0-9]+)\s+overruns\s+" - r"(?P[0-9]+)\s+frame\s+(?P[0-9]+)", - re.I) - re_openbsd_tx = re.compile(r"TX packets (?P[0-9]+)\s+bytes\s+(?P\d+)\s+.*", re.I) - re_openbsd_tx_stats = re.compile( - r"TX errors (?P[0-9]+)\s+dropped\s+(?P[0-9]+)\s+overruns\s+" - r"(?P[0-9]+)\s+carrier\s+(?P[0-9]+)\s+collisions\s+(?P[0-9]+)", - re.I) - re_openbsd = [re_openbsd_interface, re_openbsd_ipv4, re_openbsd_ipv6, re_openbsd_details, re_openbsd_rx, - re_openbsd_rx_stats, re_openbsd_tx, re_openbsd_tx_stats] - - # FreeBSD syntax - re_freebsd_interface = re.compile( - r"(?P[a-zA-Z0-9:._-]+):\s+flags=(?P[0-9]+)<(?P\S+)>\s+metric\s+" - r"(?P[0-9]+)\s+mtu\s+(?P[0-9]+)", - re.I) - re_freebsd_ipv4 = re.compile( - r"inet (?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})\s+netmask\s+(?P0x\S+)(\s+broadcast\s+" - r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?", - re.I) - re_freebsd_ipv6 = re.compile(r"\s?inet6\s(?P.*)(?:\%\w+\d+)\sprefixlen\s(?P\d+)(?:\s\w+)?\sscopeid\s(?P\w+x\w+)", re.I) - re_freebsd_details = re.compile(r"ether\s+(?P[0-9A-Fa-f:?]+)", re.I) - re_freebsd = [re_freebsd_interface, re_freebsd_ipv4, re_freebsd_ipv6, re_freebsd_details] - - available_interfaces = dict() - - for pattern in [re_linux_interface, re_openbsd_interface, re_freebsd_interface]: - network_interfaces = re.finditer(pattern, source_data) - positions = [] - while True: - try: - pos = next(network_interfaces) - positions.append(max(pos.start() - 1, 0)) - except StopIteration: - break - if positions: - positions.append(len(source_data)) - break - - if not positions: - return available_interfaces - - for l, r in zip(positions, positions[1:]): - chunk = source_data[l:r] - _interface = dict() - for pattern in re_linux + re_openbsd + re_freebsd: - match = re.search(pattern, chunk.replace('\t', '\n')) - if match: - details = match.groupdict() - for k, v in details.items(): - if isinstance(v, str): - details[k] = v.strip() - _interface.update(details) - if _interface is not None: - available_interfaces[_interface['name']] = self.update_interface_details(_interface) - - return available_interfaces - - @staticmethod - def update_interface_details(interface): - for attr in _IfconfigParser.attributes: - if attr not in interface: - interface[attr] = None - return namedtuple('Interface', interface.keys())(**interface) - - -class InterfaceNotFound(Exception): - pass - - -def _process(proc_data): +def _process(proc_data: List[JSONDictType]) -> List[JSONDictType]: """ Final processing to conform to the schema. @@ -435,7 +63,7 @@ def _process(proc_data): Returns: - List of Dictionaries. Structured data to conform to the schema. + List of Dictionaries. Structured to conform to the schema. """ int_list = { 'flags', 'mtu', 'ipv6_mask', 'rx_packets', 'rx_bytes', 'rx_errors', 'rx_dropped', @@ -451,9 +79,9 @@ def _process(proc_data): # convert OSX-style subnet mask to dotted quad if 'ipv4_mask' in entry: try: - if entry['ipv4_mask'].startswith('0x'): + if entry['ipv4_mask'].startswith('0x'): # type: ignore new_mask = entry['ipv4_mask'] - new_mask = new_mask.lstrip('0x') + new_mask = new_mask.lstrip('0x') # type: ignore new_mask = '.'.join(str(int(i, 16)) for i in [new_mask[i:i + 2] for i in range(0, len(new_mask), 2)]) entry['ipv4_mask'] = new_mask except (ValueError, TypeError, AttributeError): @@ -462,15 +90,48 @@ def _process(proc_data): # convert state value to an array if 'state' in entry: try: - new_state = entry['state'].split(',') + new_state = entry['state'].split(',') # type: ignore entry['state'] = new_state except (ValueError, TypeError, AttributeError): pass + # conversions for list of ipv4 addresses + if 'ipv4' in entry: + for ip_address in entry['ipv4']: # type: ignore + if 'mask' in ip_address: + try: + if ip_address['mask'].startswith('0x'): # type: ignore + new_mask = ip_address['mask'] # type: ignore + new_mask = new_mask.lstrip('0x') + new_mask = '.'.join(str(int(i, 16)) for i in [new_mask[i:i + 2] for i in range(0, len(new_mask), 2)]) + ip_address['mask'] = new_mask # type: ignore + except (ValueError, TypeError, AttributeError): + pass + + # conversions for list of ipv6 addresses + if 'ipv6' in entry: + for ip_address in entry['ipv6']: # type: ignore + if 'mask' in ip_address: + ip_address['mask'] = jc.utils.convert_to_int(ip_address['mask']) # type: ignore + return proc_data -def parse(data, raw=False, quiet=False): +def _bundle_match(pattern_list, string): + """Returns a match object if a string matches one of a list of patterns. + If no match is found, returns None""" + for pattern in pattern_list: + match = re.search(pattern, string) + if match: + return match + return None + + +def parse( + data: str, + raw: bool = False, + quiet: bool = False +) -> List[JSONDictType]: """ Main text parsing function @@ -487,20 +148,167 @@ def parse(data, raw=False, quiet=False): jc.utils.compatibility(__name__, info.compatible, quiet) jc.utils.input_type_check(data) - raw_output = [] + raw_output: List[Dict] = [] + interface_item: Dict = {} + ipv4_info: List = [] + ipv6_info: List = [] + + # Linux syntax + re_linux_interface = re.compile( + r"(?P[a-zA-Z0-9:._-]+)\s+Link encap:(?P\S+\s?\S+)(\s+HWaddr\s+\b" + r"(?P[0-9A-Fa-f:?]+))?", + re.I) + re_linux_ipv4 = re.compile( + r"inet addr:(?P
(?:[0-9]{1,3}\.){3}[0-9]{1,3})(\s+Bcast:" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?\s+Mask:(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})", + re.I) + re_linux_ipv6 = re.compile( + r"inet6 addr:\s+(?P
\S+)/(?P[0-9]+)\s+Scope:(?PLink|Host)", + re.I) + re_linux_state = re.compile( + r"\W+(?P(?:\w+\s)+)(?:\s+)?MTU:(?P[0-9]+)\s+Metric:(?P[0-9]+)", re.I) + re_linux_rx = re.compile( + r"RX packets:(?P[0-9]+)\s+errors:(?P[0-9]+)\s+dropped:" + r"(?P[0-9]+)\s+overruns:(?P[0-9]+)\s+frame:(?P[0-9]+)", + re.I) + re_linux_tx = re.compile( + r"TX packets:(?P[0-9]+)\s+errors:(?P[0-9]+)\s+dropped:" + r"(?P[0-9]+)\s+overruns:(?P[0-9]+)\s+carrier:(?P[0-9]+)", + re.I) + re_linux_bytes = re.compile(r"\W+RX bytes:(?P\d+)\s+\(.*\)\s+TX bytes:(?P\d+)\s+\(.*\)", re.I) + re_linux_tx_stats = re.compile(r"collisions:(?P[0-9]+)\s+txqueuelen:[0-9]+", re.I) + + # OpenBSD syntax + re_openbsd_interface = re.compile( + r"(?P[a-zA-Z0-9:._-]+):\s+flags=(?P[0-9]+)<(?P\S+)?>\s+mtu\s+(?P[0-9]+)", + re.I) + re_openbsd_ipv4 = re.compile( + r"inet (?P
(?:[0-9]{1,3}\.){3}[0-9]{1,3})\s+netmask\s+" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})(\s+broadcast\s+" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?", + re.I) + re_openbsd_ipv6 = re.compile( + r'inet6\s+(?P
\S+)\s+prefixlen\s+(?P[0-9]+)\s+scopeid\s+(?P\w+x\w+)<(?Plink|host|global)>', + re.I) + re_openbsd_details = re.compile( + r"\S+\s+(?:(?P[0-9A-Fa-f:?]+)\s+)?txqueuelen\s+[0-9]+\s+\((?P\S+\s?\S+)\)", re.I) + re_openbsd_rx = re.compile(r"RX packets (?P[0-9]+)\s+bytes\s+(?P\d+)\s+.*", re.I) + re_openbsd_rx_stats = re.compile( + r"RX errors (?P[0-9]+)\s+dropped\s+(?P[0-9]+)\s+overruns\s+" + r"(?P[0-9]+)\s+frame\s+(?P[0-9]+)", + re.I) + re_openbsd_tx = re.compile(r"TX packets (?P[0-9]+)\s+bytes\s+(?P\d+)\s+.*", re.I) + re_openbsd_tx_stats = re.compile( + r"TX errors (?P[0-9]+)\s+dropped\s+(?P[0-9]+)\s+overruns\s+" + r"(?P[0-9]+)\s+carrier\s+(?P[0-9]+)\s+collisions\s+(?P[0-9]+)", + re.I) + re_openbsd = [re_openbsd_interface, re_openbsd_ipv4, re_openbsd_ipv6, re_openbsd_details, re_openbsd_rx, + re_openbsd_rx_stats, re_openbsd_tx, re_openbsd_tx_stats] + + # FreeBSD syntax + re_freebsd_interface = re.compile( + r"(?P[a-zA-Z0-9:._-]+):\s+flags=(?P[0-9]+)<(?P\S+)>\s+metric\s+" + r"(?P[0-9]+)\s+mtu\s+(?P[0-9]+)", + re.I) + re_freebsd_ipv4 = re.compile( + r"inet (?P
(?:[0-9]{1,3}\.){3}[0-9]{1,3})\s+netmask\s+(?P0x\S+)(\s+broadcast\s+" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?", + re.I) + re_freebsd_ipv6 = re.compile(r"\s?inet6\s(?P
.*)(?:\%\w+\d+)\sprefixlen\s(?P\d+)(?:\s\w+)?\sscopeid\s(?P\w+x\w+)", re.I) + re_freebsd_details = re.compile(r"ether\s+(?P[0-9A-Fa-f:?]+)", re.I) + + + re_linux = [ + re_linux_interface, re_linux_ipv4, re_linux_ipv6, re_linux_state, re_linux_rx, re_linux_tx, + re_linux_bytes, re_linux_tx_stats + ] + re_openbsd = [ + re_openbsd_interface, re_openbsd_ipv4, re_openbsd_ipv6, re_openbsd_details, re_openbsd_rx, + re_openbsd_rx_stats, re_openbsd_tx, re_openbsd_tx_stats + ] + re_freebsd = [re_freebsd_interface, re_freebsd_ipv4, re_freebsd_ipv6, re_freebsd_details] + + interface_patterns = [re_linux_interface, re_openbsd_interface, re_freebsd_interface] + ipv4_patterns = [re_linux_ipv4, re_openbsd_ipv4, re_freebsd_ipv4] + ipv6_patterns = [re_linux_ipv6, re_openbsd_ipv6, re_freebsd_ipv6] if jc.utils.has_data(data): + for line in filter(None, data.splitlines()): - parsed = _IfconfigParser(console_output=data) - interfaces = parsed.get_interfaces() + # Find new interface lines + interface_match = _bundle_match(interface_patterns, line) + if interface_match: + if interface_item: + if ipv4_info: + interface_item['ipv4'] = ipv4_info + if ipv6_info: + interface_item['ipv6'] = ipv6_info + raw_output.append(interface_item) + interface_item = {} + ipv4_info = [] + ipv6_info = [] - # convert ifconfigparser output to a dictionary - for iface in interfaces: - d = interfaces[iface]._asdict() - dct = dict(d) - raw_output.append(dct) + interface_item.update(interface_match.groupdict()) + continue - if raw: - return raw_output - else: - return _process(raw_output) + ### for backwards compatibility! + # add in old ipv4/ipv6 address fields in root of object. + # this will only keep the last ip address in the interface output. + # old fieldnames: ipv4_addr, ipv4_mask, ipv4_bcast, ipv6_addr, ipv6_mask, ipv6_scope + ipv4_match_legacy = _bundle_match(ipv4_patterns, line) + if ipv4_match_legacy: + ipv4_dict = ipv4_match_legacy.groupdict() + # rename to legacy names + name_map = { + 'address': 'ipv4_addr', + 'mask': 'ipv4_mask', + 'broadcast': 'ipv4_bcast' + } + for k, v in ipv4_dict.copy().items(): + ipv4_dict[name_map[k]] = v + del ipv4_dict[k] + interface_item.update(ipv4_dict) + + ipv6_match_legacy = _bundle_match(ipv6_patterns, line) + if ipv6_match_legacy: + ipv6_dict = ipv6_match_legacy.groupdict() + # rename to legacy names + name_map = { + 'address': 'ipv6_addr', + 'mask': 'ipv6_mask', + 'broadcast': 'ipv6_bcast', + 'scope': 'ipv6_scope', + 'type': 'ipv6_type' + } + for k, v in ipv6_dict.copy().items(): + ipv6_dict[name_map[k]] = v + del ipv6_dict[k] + interface_item.update(ipv6_dict) + ### Backwards compatibility end + + # ipv4 information lines + ipv4_match = _bundle_match(ipv4_patterns, line) + if ipv4_match: + ipv4_info.append(ipv4_match.groupdict()) + continue + + # ipv6 information lines + ipv6_match = _bundle_match(ipv6_patterns, line) + if ipv6_match: + ipv6_info.append(ipv6_match.groupdict()) + continue + + # All other lines + other_match = _bundle_match(re_linux + re_openbsd + re_freebsd, line) + if other_match: + interface_item.update(other_match.groupdict()) + continue + + if interface_item: + if ipv4_info: + interface_item['ipv4'] = ipv4_info + if ipv6_info: + interface_item['ipv6'] = ipv6_info + raw_output.append(interface_item) + + return raw_output if raw else _process(raw_output) diff --git a/jc/parsers/ifconfig2.py b/jc/parsers/ifconfig2.py new file mode 100644 index 00000000..94fa0096 --- /dev/null +++ b/jc/parsers/ifconfig2.py @@ -0,0 +1,506 @@ +"""jc - JSON Convert `ifconfig` command output parser + +No `ifconfig` options are supported. + +Consider using the `ip` command instead of `ifconfig` as it supports native +JSON output and provides more detailed output than the `ifconfig` parser. +(e.g. support for multiple IPv4 and IPv6 addresses.) + +> Note: This parser will only output the last IPv4 and IPv6 address for +> each interface in the command output. + +Usage (cli): + + $ ifconfig | jc --ifconfig + +or + + $ jc ifconfig + +Usage (module): + + import jc + result = jc.parse('ifconfig', ifconfig_command_output) + +Schema: + + [ + { + "name": string, + "flags": integer, + "state": [ + string + ], + "mtu": integer, + "ipv4_addr": string, + "ipv4_mask": string, + "ipv4_bcast": string, + "ipv6_addr": string, + "ipv6_mask": integer, + "ipv6_scope": string, + "mac_addr": string, + "type": string, + "rx_packets": integer, + "rx_bytes": integer, + "rx_errors": integer, + "rx_dropped": integer, + "rx_overruns": integer, + "rx_frame": integer, + "tx_packets": integer, + "tx_bytes": integer, + "tx_errors": integer, + "tx_dropped": integer, + "tx_overruns": integer, + "tx_carrier": integer, + "tx_collisions": integer, + "metric": integer + } + ] + +Examples: + + $ ifconfig | jc --ifconfig -p + [ + { + "name": "ens33", + "flags": 4163, + "state": [ + "UP", + "BROADCAST", + "RUNNING", + "MULTICAST" + ], + "mtu": 1500, + "ipv4_addr": "192.168.71.137", + "ipv4_mask": "255.255.255.0", + "ipv4_bcast": "192.168.71.255", + "ipv6_addr": "fe80::c1cb:715d:bc3e:b8a0", + "ipv6_mask": 64, + "ipv6_scope": "0x20", + "mac_addr": "00:0c:29:3b:58:0e", + "type": "Ethernet", + "rx_packets": 8061, + "rx_bytes": 1514413, + "rx_errors": 0, + "rx_dropped": 0, + "rx_overruns": 0, + "rx_frame": 0, + "tx_packets": 4502, + "tx_bytes": 866622, + "tx_errors": 0, + "tx_dropped": 0, + "tx_overruns": 0, + "tx_carrier": 0, + "tx_collisions": 0, + "metric": null + }, + { + "name": "lo", + "flags": 73, + "state": [ + "UP", + "LOOPBACK", + "RUNNING" + ], + "mtu": 65536, + "ipv4_addr": "127.0.0.1", + "ipv4_mask": "255.0.0.0", + "ipv4_bcast": null, + "ipv6_addr": "::1", + "ipv6_mask": 128, + "ipv6_scope": "0x10", + "mac_addr": null, + "type": "Local Loopback", + "rx_packets": 73, + "rx_bytes": 6009, + "rx_errors": 0, + "rx_dropped": 0, + "rx_overruns": 0, + "rx_frame": 0, + "tx_packets": 73, + "tx_bytes": 6009, + "tx_errors": 0, + "tx_dropped": 0, + "tx_overruns": 0, + "tx_carrier": 0, + "tx_collisions": 0, + "metric": null + } + ] + + $ ifconfig | jc --ifconfig -p -r + [ + { + "name": "ens33", + "flags": "4163", + "state": "UP,BROADCAST,RUNNING,MULTICAST", + "mtu": "1500", + "ipv4_addr": "192.168.71.137", + "ipv4_mask": "255.255.255.0", + "ipv4_bcast": "192.168.71.255", + "ipv6_addr": "fe80::c1cb:715d:bc3e:b8a0", + "ipv6_mask": "64", + "ipv6_scope": "0x20", + "mac_addr": "00:0c:29:3b:58:0e", + "type": "Ethernet", + "rx_packets": "8061", + "rx_bytes": "1514413", + "rx_errors": "0", + "rx_dropped": "0", + "rx_overruns": "0", + "rx_frame": "0", + "tx_packets": "4502", + "tx_bytes": "866622", + "tx_errors": "0", + "tx_dropped": "0", + "tx_overruns": "0", + "tx_carrier": "0", + "tx_collisions": "0", + "metric": null + }, + { + "name": "lo", + "flags": "73", + "state": "UP,LOOPBACK,RUNNING", + "mtu": "65536", + "ipv4_addr": "127.0.0.1", + "ipv4_mask": "255.0.0.0", + "ipv4_bcast": null, + "ipv6_addr": "::1", + "ipv6_mask": "128", + "ipv6_scope": "0x10", + "mac_addr": null, + "type": "Local Loopback", + "rx_packets": "73", + "rx_bytes": "6009", + "rx_errors": "0", + "rx_dropped": "0", + "rx_overruns": "0", + "rx_frame": "0", + "tx_packets": "73", + "tx_bytes": "6009", + "tx_errors": "0", + "tx_dropped": "0", + "tx_overruns": "0", + "tx_carrier": "0", + "tx_collisions": "0", + "metric": null + } + ] +""" +import re +from collections import namedtuple +import jc.utils + + +class info(): + """Provides parser metadata (version, author, etc.)""" + version = '1.12' + description = '`ifconfig` command parser' + author = 'Kelly Brazil' + author_email = 'kellyjonbrazil@gmail.com' + details = 'Using ifconfig-parser from https://github.com/KnightWhoSayNi/ifconfig-parser' + compatible = ['linux', 'aix', 'freebsd', 'darwin'] + magic_commands = ['ifconfig'] + + +__version__ = info.version + + +class _IfconfigParser(object): + """ifconfig parser module written by threeheadedknight@protonmail.com""" + # Author: threeheadedknight@protonmail.com + # Date created: 30.06.2018 17:03 + # Python Version: 3.7 + + # MIT License + + # Copyright (c) 2018 threeheadedknight@protonmail.com + + # Permission is hereby granted, free of charge, to any person obtaining a copy + # of this software and associated documentation files (the "Software"), to deal + # in the Software without restriction, including without limitation the rights + # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + # copies of the Software, and to permit persons to whom the Software is + # furnished to do so, subject to the following conditions: + + # The above copyright notice and this permission notice shall be included in all + # copies or substantial portions of the Software. + + # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + # SOFTWARE. + + attributes = [ + 'name', 'type', 'mac_addr', 'ipv4_addr', 'ipv4_bcast', 'ipv4_mask', 'ipv6_addr', + 'ipv6_mask', 'ipv6_scope', 'state', 'mtu', 'metric', 'rx_packets', 'rx_errors', + 'rx_dropped', 'rx_overruns', 'rx_frame', 'tx_packets', 'tx_errors', 'tx_dropped', + 'tx_overruns', 'tx_carrier', 'tx_collisions', 'rx_bytes', 'tx_bytes' + ] + + def __init__(self, console_output): + """ + :param console_output: + """ + + if isinstance(console_output, list): + source_data = " ".join(console_output) + else: + source_data = console_output.replace("\n", " ") + self.interfaces = self.parser(source_data=source_data) + + def list_interfaces(self): + """ + :return: + """ + return sorted(self.interfaces.keys()) + + def count_interfaces(self): + """ + :return: + """ + return len(self.interfaces.keys()) + + def filter_interfaces(self, **kwargs): + """ + :param kwargs: + :return: + """ + for attr in kwargs.keys(): + if attr not in _IfconfigParser.attributes: + raise ValueError("Attribute [{}] not supported.".format(attr)) + + filtered_interfaces = [] + for name, details in self.interfaces.items(): + + if all(getattr(details, attr) == kwargs[attr] for attr in kwargs.keys()): + filtered_interfaces.append(name) + + return sorted(filtered_interfaces) + + def get_interface(self, name): + """ + :param name: + :return: + """ + if name in self.list_interfaces(): + return self.interfaces[name] + else: + raise InterfaceNotFound("Interface [{}] not found.".format(name)) + + def get_interfaces(self): + """ + :return: + """ + return self.interfaces + + def is_available(self, name): + """ + :param name: + :return: + """ + return name in self.interfaces + + def parser(self, source_data): + """ + :param source_data: + :return: + """ + + # Linux syntax + re_linux_interface = re.compile( + r"(?P[a-zA-Z0-9:._-]+)\s+Link encap:(?P\S+\s?\S+)(\s+HWaddr\s+\b" + r"(?P[0-9A-Fa-f:?]+))?", + re.I) + re_linux_ipv4 = re.compile( + r"inet addr:(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})(\s+Bcast:" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?\s+Mask:(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})", + re.I) + re_linux_ipv6 = re.compile( + r"inet6 addr:\s+(?P\S+)/(?P[0-9]+)\s+Scope:(?PLink|Host)", + re.I) + re_linux_state = re.compile( + r"\W+(?P(?:\w+\s)+)(?:\s+)?MTU:(?P[0-9]+)\s+Metric:(?P[0-9]+)", re.I) + re_linux_rx = re.compile( + r"RX packets:(?P[0-9]+)\s+errors:(?P[0-9]+)\s+dropped:" + r"(?P[0-9]+)\s+overruns:(?P[0-9]+)\s+frame:(?P[0-9]+)", + re.I) + re_linux_tx = re.compile( + r"TX packets:(?P[0-9]+)\s+errors:(?P[0-9]+)\s+dropped:" + r"(?P[0-9]+)\s+overruns:(?P[0-9]+)\s+carrier:(?P[0-9]+)", + re.I) + re_linux_bytes = re.compile(r"\W+RX bytes:(?P\d+)\s+\(.*\)\s+TX bytes:(?P\d+)\s+\(.*\)", re.I) + re_linux_tx_stats = re.compile(r"collisions:(?P[0-9]+)\s+txqueuelen:[0-9]+", re.I) + re_linux = [re_linux_interface, re_linux_ipv4, re_linux_ipv6, re_linux_state, re_linux_rx, re_linux_tx, + re_linux_bytes, re_linux_tx_stats] + + # OpenBSD syntax + re_openbsd_interface = re.compile( + r"(?P[a-zA-Z0-9:._-]+):\s+flags=(?P[0-9]+)<(?P\S+)?>\s+mtu\s+(?P[0-9]+)", + re.I) + re_openbsd_ipv4 = re.compile( + r"inet (?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})\s+netmask\s+" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})(\s+broadcast\s+" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?", + re.I) + re_openbsd_ipv6 = re.compile( + r"inet6\s+(?P\S+)\s+prefixlen\s+(?P[0-9]+)\s+scopeid\s+(?P\w+x\w+)<" + r"(?:link|host)>", + re.I) + re_openbsd_details = re.compile( + r"\S+\s+(?:(?P[0-9A-Fa-f:?]+)\s+)?txqueuelen\s+[0-9]+\s+\((?P\S+\s?\S+)\)", re.I) + re_openbsd_rx = re.compile(r"RX packets (?P[0-9]+)\s+bytes\s+(?P\d+)\s+.*", re.I) + re_openbsd_rx_stats = re.compile( + r"RX errors (?P[0-9]+)\s+dropped\s+(?P[0-9]+)\s+overruns\s+" + r"(?P[0-9]+)\s+frame\s+(?P[0-9]+)", + re.I) + re_openbsd_tx = re.compile(r"TX packets (?P[0-9]+)\s+bytes\s+(?P\d+)\s+.*", re.I) + re_openbsd_tx_stats = re.compile( + r"TX errors (?P[0-9]+)\s+dropped\s+(?P[0-9]+)\s+overruns\s+" + r"(?P[0-9]+)\s+carrier\s+(?P[0-9]+)\s+collisions\s+(?P[0-9]+)", + re.I) + re_openbsd = [re_openbsd_interface, re_openbsd_ipv4, re_openbsd_ipv6, re_openbsd_details, re_openbsd_rx, + re_openbsd_rx_stats, re_openbsd_tx, re_openbsd_tx_stats] + + # FreeBSD syntax + re_freebsd_interface = re.compile( + r"(?P[a-zA-Z0-9:._-]+):\s+flags=(?P[0-9]+)<(?P\S+)>\s+metric\s+" + r"(?P[0-9]+)\s+mtu\s+(?P[0-9]+)", + re.I) + re_freebsd_ipv4 = re.compile( + r"inet (?P(?:[0-9]{1,3}\.){3}[0-9]{1,3})\s+netmask\s+(?P0x\S+)(\s+broadcast\s+" + r"(?P(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?", + re.I) + re_freebsd_ipv6 = re.compile(r"\s?inet6\s(?P.*)(?:\%\w+\d+)\sprefixlen\s(?P\d+)(?:\s\w+)?\sscopeid\s(?P\w+x\w+)", re.I) + re_freebsd_details = re.compile(r"ether\s+(?P[0-9A-Fa-f:?]+)", re.I) + re_freebsd = [re_freebsd_interface, re_freebsd_ipv4, re_freebsd_ipv6, re_freebsd_details] + + available_interfaces = dict() + + for pattern in [re_linux_interface, re_openbsd_interface, re_freebsd_interface]: + network_interfaces = re.finditer(pattern, source_data) + positions = [] + while True: + try: + pos = next(network_interfaces) + positions.append(max(pos.start() - 1, 0)) + except StopIteration: + break + if positions: + positions.append(len(source_data)) + break + + if not positions: + return available_interfaces + + for l, r in zip(positions, positions[1:]): + chunk = source_data[l:r] + _interface = dict() + for pattern in re_linux + re_openbsd + re_freebsd: + match = re.search(pattern, chunk.replace('\t', '\n')) + if match: + details = match.groupdict() + for k, v in details.items(): + if isinstance(v, str): + details[k] = v.strip() + _interface.update(details) + if _interface is not None: + available_interfaces[_interface['name']] = self.update_interface_details(_interface) + + return available_interfaces + + @staticmethod + def update_interface_details(interface): + for attr in _IfconfigParser.attributes: + if attr not in interface: + interface[attr] = None + return namedtuple('Interface', interface.keys())(**interface) + + +class InterfaceNotFound(Exception): + pass + + +def _process(proc_data): + """ + Final processing to conform to the schema. + + Parameters: + + proc_data: (List of Dictionaries) raw structured data to process + + Returns: + + List of Dictionaries. Structured data to conform to the schema. + """ + int_list = { + 'flags', 'mtu', 'ipv6_mask', 'rx_packets', 'rx_bytes', 'rx_errors', 'rx_dropped', + 'rx_overruns', 'rx_frame', 'tx_packets', 'tx_bytes', 'tx_errors', 'tx_dropped', + 'tx_overruns', 'tx_carrier', 'tx_collisions', 'metric' + } + + for entry in proc_data: + for key in entry: + if key in int_list: + entry[key] = jc.utils.convert_to_int(entry[key]) + + # convert OSX-style subnet mask to dotted quad + if 'ipv4_mask' in entry: + try: + if entry['ipv4_mask'].startswith('0x'): + new_mask = entry['ipv4_mask'] + new_mask = new_mask.lstrip('0x') + new_mask = '.'.join(str(int(i, 16)) for i in [new_mask[i:i + 2] for i in range(0, len(new_mask), 2)]) + entry['ipv4_mask'] = new_mask + except (ValueError, TypeError, AttributeError): + pass + + # convert state value to an array + if 'state' in entry: + try: + new_state = entry['state'].split(',') + entry['state'] = new_state + except (ValueError, TypeError, AttributeError): + pass + + return proc_data + + +def parse(data, raw=False, quiet=False): + """ + Main text parsing function + + Parameters: + + data: (string) text data to parse + raw: (boolean) unprocessed output if True + quiet: (boolean) suppress warning messages if True + + Returns: + + List of Dictionaries. Raw or processed structured data. + """ + jc.utils.compatibility(__name__, info.compatible, quiet) + jc.utils.input_type_check(data) + + raw_output = [] + + if jc.utils.has_data(data): + + parsed = _IfconfigParser(console_output=data) + interfaces = parsed.get_interfaces() + + # convert ifconfigparser output to a dictionary + for iface in interfaces: + d = interfaces[iface]._asdict() + dct = dict(d) + raw_output.append(dct) + + if raw: + return raw_output + else: + return _process(raw_output) diff --git a/tests/test_ifconfig.py b/tests/test_ifconfig.py index 19863d02..75d38d22 100644 --- a/tests/test_ifconfig.py +++ b/tests/test_ifconfig.py @@ -56,36 +56,42 @@ class MyTests(unittest.TestCase): """ Test 'ifconfig' on Centos 7.7 """ + self.maxDiff = None self.assertEqual(jc.parsers.ifconfig.parse(self.centos_7_7_ifconfig, quiet=True), self.centos_7_7_ifconfig_json) def test_ifconfig_ubuntu_18_4(self): """ Test 'ifconfig' on Ubuntu 18.4 """ + self.maxDiff = None self.assertEqual(jc.parsers.ifconfig.parse(self.ubuntu_18_4_ifconfig, quiet=True), self.ubuntu_18_4_ifconfig_json) def test_ifconfig_osx_10_11_6(self): """ Test 'ifconfig' on OSX 10.11.6 """ + self.maxDiff = None self.assertEqual(jc.parsers.ifconfig.parse(self.osx_10_11_6_ifconfig, quiet=True), self.osx_10_11_6_ifconfig_json) def test_ifconfig_osx_10_11_6_2(self): """ Test 'ifconfig' on OSX 10.11.6 """ + self.maxDiff = None self.assertEqual(jc.parsers.ifconfig.parse(self.osx_10_11_6_ifconfig2, quiet=True), self.osx_10_11_6_ifconfig2_json) def test_ifconfig_osx_10_14_6(self): """ Test 'ifconfig' on OSX 10.14.6 """ + self.maxDiff = None self.assertEqual(jc.parsers.ifconfig.parse(self.osx_10_14_6_ifconfig, quiet=True), self.osx_10_14_6_ifconfig_json) def test_ifconfig_osx_10_14_6_2(self): """ Test 'ifconfig' on OSX 10.14.6 """ + self.maxDiff = None self.assertEqual(jc.parsers.ifconfig.parse(self.osx_10_14_6_ifconfig2, quiet=True), self.osx_10_14_6_ifconfig2_json)