1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-23 00:29:59 +02:00

fix for no data

This commit is contained in:
Kelly Brazil
2020-06-12 12:25:07 -07:00
parent a8837e1244
commit 1fb84fce88
16 changed files with 135 additions and 65 deletions

View File

@ -17,6 +17,11 @@ jc changelog
- Update route parser to fix error on parsing empty data - Update route parser to fix error on parsing empty data
- Update systemctl parser to fix error on parsing empty data - Update systemctl parser to fix error on parsing empty data
- Update systemctl_lj parser to fix error on parsing empty data - Update systemctl_lj parser to fix error on parsing empty data
- Update systemctl_ls parser to fix error on parsing empty data
- Update systemctl_luf parser to fix error on parsing empty data
- Update uptime parser to fix error on parsing empty data
- Update w parser to fix error on parsing empty data
- Update xml parser to fix error on parsing empty data
- Add tests to all parsers for no data condition - Add tests to all parsers for no data condition
- Update ss parser to fix integer fields - Update ss parser to fix integer fields

View File

@ -125,11 +125,12 @@ def parse(data, raw=False, quiet=False):
linedata = data.splitlines() linedata = data.splitlines()
# Clear any blank lines # Clear any blank lines
linedata = list(filter(None, linedata)) linedata = list(filter(None, linedata))
# clean up non-ascii characters, if any
raw_output = [] raw_output = []
if linedata: if linedata:
cleandata = [] cleandata = []
# clean up non-ascii characters, if any
for entry in linedata: for entry in linedata:
cleandata.append(entry.encode('ascii', errors='ignore').decode()) cleandata.append(entry.encode('ascii', errors='ignore').decode())

View File

@ -34,7 +34,7 @@ import jc.utils
class info(): class info():
version = '1.1' version = '1.2'
description = 'systemctl list-sockets command parser' description = 'systemctl list-sockets command parser'
author = 'Kelly Brazil' author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com' author_email = 'kellyjonbrazil@gmail.com'
@ -91,24 +91,27 @@ def parse(data, raw=False, quiet=False):
linedata = data.splitlines() linedata = data.splitlines()
# Clear any blank lines # Clear any blank lines
linedata = list(filter(None, linedata)) linedata = list(filter(None, linedata))
# clean up non-ascii characters, if any
cleandata = []
for entry in linedata:
cleandata.append(entry.encode('ascii', errors='ignore').decode())
header_text = cleandata[0].lower()
header_list = header_text.split()
raw_output = [] raw_output = []
for entry in cleandata[1:]: if linedata:
if 'sockets listed.' in entry: cleandata = []
break # clean up non-ascii characters, if any
for entry in linedata:
cleandata.append(entry.encode('ascii', errors='ignore').decode())
else: header_text = cleandata[0].lower()
entry_list = entry.rsplit(maxsplit=2) header_list = header_text.split()
output_line = dict(zip(header_list, entry_list))
raw_output.append(output_line) raw_output = []
for entry in cleandata[1:]:
if 'sockets listed.' in entry:
break
else:
entry_list = entry.rsplit(maxsplit=2)
output_line = dict(zip(header_list, entry_list))
raw_output.append(output_line)
if raw: if raw:
return raw_output return raw_output

View File

@ -31,7 +31,7 @@ import jc.utils
class info(): class info():
version = '1.1' version = '1.2'
description = 'systemctl list-unit-files command parser' description = 'systemctl list-unit-files command parser'
author = 'Kelly Brazil' author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com' author_email = 'kellyjonbrazil@gmail.com'
@ -87,25 +87,28 @@ def parse(data, raw=False, quiet=False):
linedata = data.splitlines() linedata = data.splitlines()
# Clear any blank lines # Clear any blank lines
linedata = list(filter(None, linedata)) linedata = list(filter(None, linedata))
# clean up non-ascii characters, if any
cleandata = []
for entry in linedata:
cleandata.append(entry.encode('ascii', errors='ignore').decode())
header_text = cleandata[0]
header_text = header_text.lower().replace('unit file', 'unit_file')
header_list = header_text.split()
raw_output = [] raw_output = []
for entry in cleandata[1:]: if linedata:
if 'unit files listed.' in entry: cleandata = []
break # clean up non-ascii characters, if any
for entry in linedata:
cleandata.append(entry.encode('ascii', errors='ignore').decode())
else: header_text = cleandata[0]
entry_list = entry.split(maxsplit=4) header_text = header_text.lower().replace('unit file', 'unit_file')
output_line = dict(zip(header_list, entry_list)) header_list = header_text.split()
raw_output.append(output_line)
raw_output = []
for entry in cleandata[1:]:
if 'unit files listed.' in entry:
break
else:
entry_list = entry.split(maxsplit=4)
output_line = dict(zip(header_list, entry_list))
raw_output.append(output_line)
if raw: if raw:
return raw_output return raw_output

View File

@ -34,7 +34,7 @@ import jc.utils
class info(): class info():
version = '1.0' version = '1.1'
description = 'uptime command parser' description = 'uptime command parser'
author = 'Kelly Brazil' author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com' author_email = 'kellyjonbrazil@gmail.com'
@ -107,10 +107,9 @@ def parse(data, raw=False, quiet=False):
jc.utils.compatibility(__name__, info.compatible) jc.utils.compatibility(__name__, info.compatible)
raw_output = {} raw_output = {}
cleandata = data.splitlines() cleandata = data.splitlines()
if cleandata: if list(filter(None, cleandata)):
parsed_line = cleandata[0].split() parsed_line = cleandata[0].split()
# allow space for odd times # allow space for odd times

View File

@ -83,7 +83,7 @@ import jc.utils
class info(): class info():
version = '1.1' version = '1.2'
description = 'w command parser' description = 'w command parser'
author = 'Kelly Brazil' author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com' author_email = 'kellyjonbrazil@gmail.com'
@ -149,36 +149,39 @@ def parse(data, raw=False, quiet=False):
jc.utils.compatibility(__name__, info.compatible) jc.utils.compatibility(__name__, info.compatible)
cleandata = data.splitlines()[1:] cleandata = data.splitlines()[1:]
header_text = cleandata[0].lower()
# fixup for 'from' column that can be blank
from_col = header_text.find('from')
# clean up 'login@' header
# even though @ in a key is valid json, it can make things difficult
header_text = header_text.replace('login@', 'login_at')
headers = [h for h in ' '.join(header_text.strip().split()).split() if h]
# parse lines
raw_output = [] raw_output = []
if cleandata:
for entry in cleandata[1:]:
output_line = {}
# normalize data by inserting Null for missing data if list(filter(None, cleandata)):
temp_line = entry.split(maxsplit=len(headers) - 1) header_text = cleandata[0].lower()
# fixup for 'from' column that can be blank
from_col = header_text.find('from')
# clean up 'login@' header
# even though @ in a key is valid json, it can make things difficult
header_text = header_text.replace('login@', 'login_at')
headers = [h for h in ' '.join(header_text.strip().split()).split() if h]
# fix from column, always at column 2 # parse lines
if 'from' in headers: raw_output = []
if entry[from_col] in string.whitespace: if cleandata:
temp_line.insert(2, '-') for entry in cleandata[1:]:
output_line = {}
output_line = dict(zip(headers, temp_line)) # normalize data by inserting Null for missing data
raw_output.append(output_line) temp_line = entry.split(maxsplit=len(headers) - 1)
# strip whitespace from beginning and end of all string values # fix from column, always at column 2
for row in raw_output: if 'from' in headers:
for item in row: if entry[from_col] in string.whitespace:
if isinstance(row[item], str): temp_line.insert(2, '-')
row[item] = row[item].strip()
output_line = dict(zip(headers, temp_line))
raw_output.append(output_line)
# strip whitespace from beginning and end of all string values
for row in raw_output:
for item in row:
if isinstance(row[item], str):
row[item] = row[item].strip()
if raw: if raw:
return raw_output return raw_output

View File

@ -59,7 +59,7 @@ import xmltodict
class info(): class info():
version = '1.0' version = '1.1'
description = 'XML file parser' description = 'XML file parser'
author = 'Kelly Brazil' author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com' author_email = 'kellyjonbrazil@gmail.com'
@ -111,7 +111,9 @@ def parse(data, raw=False, quiet=False):
if not quiet: if not quiet:
jc.utils.compatibility(__name__, info.compatible) jc.utils.compatibility(__name__, info.compatible)
if data: raw_output = []
if list(filter(None, data.splitlines())):
raw_output = xmltodict.parse(data) raw_output = xmltodict.parse(data)
if raw: if raw:

View File

@ -23,6 +23,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/ubuntu-18.04/systemctl-ls.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/ubuntu-18.04/systemctl-ls.json'), 'r', encoding='utf-8') as f:
self.ubuntu_18_4_systemctl_ls_json = json.loads(f.read()) self.ubuntu_18_4_systemctl_ls_json = json.loads(f.read())
def test_systemctl_ls_nodata(self):
"""
Test 'systemctl -a list-sockets' with no data
"""
self.assertEqual(jc.parsers.systemctl_ls.parse('', quiet=True), [])
def test_systemctl_ls_centos_7_7(self): def test_systemctl_ls_centos_7_7(self):
""" """
Test 'systemctl -a list-sockets' on Centos 7.7 Test 'systemctl -a list-sockets' on Centos 7.7

View File

@ -23,6 +23,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/ubuntu-18.04/systemctl-luf.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/ubuntu-18.04/systemctl-luf.json'), 'r', encoding='utf-8') as f:
self.ubuntu_18_4_systemctl_luf_json = json.loads(f.read()) self.ubuntu_18_4_systemctl_luf_json = json.loads(f.read())
def test_systemctl_luf_nodata(self):
"""
Test 'systemctl -a list-sockets' with no data
"""
self.assertEqual(jc.parsers.systemctl_luf.parse('', quiet=True), [])
def test_systemctl_luf_centos_7_7(self): def test_systemctl_luf_centos_7_7(self):
""" """
Test 'systemctl -a list-sockets' on Centos 7.7 Test 'systemctl -a list-sockets' on Centos 7.7

View File

@ -23,6 +23,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/ubuntu-18.04/timedatectl.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/ubuntu-18.04/timedatectl.json'), 'r', encoding='utf-8') as f:
self.ubuntu_18_4_timedatectl_json = json.loads(f.read()) self.ubuntu_18_4_timedatectl_json = json.loads(f.read())
def test_timedatectl_nodata(self):
"""
Test 'timedatectl' with no data
"""
self.assertEqual(jc.parsers.timedatectl.parse('', quiet=True), {})
def test_timedatectl_centos_7_7(self): def test_timedatectl_centos_7_7(self):
""" """
Test 'timedatectl' on Centos 7.7 Test 'timedatectl' on Centos 7.7

View File

@ -35,6 +35,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/osx-10.14.6/uname-a.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/osx-10.14.6/uname-a.json'), 'r', encoding='utf-8') as f:
self.osx_10_14_6_uname_a_json = json.loads(f.read()) self.osx_10_14_6_uname_a_json = json.loads(f.read())
def test_uname_nodata(self):
"""
Test 'uname -a' with no data
"""
self.assertEqual(jc.parsers.uname.parse('', quiet=True), {})
def test_uname_centos_7_7(self): def test_uname_centos_7_7(self):
""" """
Test 'uname -a' on Centos 7.7 Test 'uname -a' on Centos 7.7

View File

@ -35,6 +35,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/osx-10.14.6/uptime.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/osx-10.14.6/uptime.json'), 'r', encoding='utf-8') as f:
self.osx_10_14_6_uptime_json = json.loads(f.read()) self.osx_10_14_6_uptime_json = json.loads(f.read())
def test_uptime_nodata(self):
"""
Test 'uptime' with no data
"""
self.assertEqual(jc.parsers.uptime.parse('', quiet=True), {})
def test_uptime_centos_7_7(self): def test_uptime_centos_7_7(self):
""" """
Test 'uptime' on Centos 7.7 Test 'uptime' on Centos 7.7

View File

@ -41,6 +41,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/nixos/w.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/nixos/w.json'), 'r', encoding='utf-8') as f:
self.nixos_w_json = json.loads(f.read()) self.nixos_w_json = json.loads(f.read())
def test_w_nodata(self):
"""
Test 'w' with no data
"""
self.assertEqual(jc.parsers.w.parse('', quiet=True), [])
def test_w_centos_7_7(self): def test_w_centos_7_7(self):
""" """
Test 'w' on Centos 7.7 Test 'w' on Centos 7.7

View File

@ -47,6 +47,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/osx-10.14.6/who-a.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/osx-10.14.6/who-a.json'), 'r', encoding='utf-8') as f:
self.osx_10_14_6_who_a_json = json.loads(f.read()) self.osx_10_14_6_who_a_json = json.loads(f.read())
def test_who_nodata(self):
"""
Test 'who' with no data
"""
self.assertEqual(jc.parsers.who.parse('', quiet=True), [])
def test_who_centos_7_7(self): def test_who_centos_7_7(self):
""" """
Test 'who' on Centos 7.7 Test 'who' on Centos 7.7

View File

@ -23,6 +23,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/xml-foodmenu.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/xml-foodmenu.json'), 'r', encoding='utf-8') as f:
self.generic_xml_foodmenu_json = json.loads(f.read()) self.generic_xml_foodmenu_json = json.loads(f.read())
def test_xml_nodata(self):
"""
Test xml parser with no data
"""
self.assertEqual(jc.parsers.xml.parse('', quiet=True), [])
def test_xml_cd_catalog(self): def test_xml_cd_catalog(self):
""" """
Test the cd catalog xml file Test the cd catalog xml file

View File

@ -23,6 +23,12 @@ class MyTests(unittest.TestCase):
with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/yaml-istio-sidecar.json'), 'r', encoding='utf-8') as f: with open(os.path.join(THIS_DIR, os.pardir, 'tests/fixtures/generic/yaml-istio-sidecar.json'), 'r', encoding='utf-8') as f:
self.generic_yaml_istio_sidecar_json = json.loads(f.read()) self.generic_yaml_istio_sidecar_json = json.loads(f.read())
def test_yaml_nodata(self):
"""
Test the YAML parser with no data
"""
self.assertEqual(jc.parsers.yaml.parse('', quiet=True), [])
def test_yaml_istio_sc(self): def test_yaml_istio_sc(self):
""" """
Test the Istio SC yaml file Test the Istio SC yaml file