1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-19 00:17:51 +02:00

Add nsd-control (#454)

* Create nsd_control.py

Init nsd_control.py

* cleanup nsd-control + add test data

- Cleanup nsd-control parser
- Add test data

* add test script

add test script + fix test data

* Update test_nsd_control.py

fix a default test

* Update test_nsd_control.py

nit
This commit is contained in:
pettai
2023-10-01 00:36:52 +02:00
committed by GitHub
parent eae1d4b89a
commit a1f10928e1
9 changed files with 326 additions and 0 deletions

View File

@ -92,6 +92,7 @@ parsers: List[str] = [
'mpstat-s',
'netstat',
'nmcli',
'nsd-control',
'ntpq',
'openvpn',
'os-prober',

249
jc/parsers/nsd_control.py Normal file
View File

@ -0,0 +1,249 @@
"""jc - JSON Convert `nsd-control` command output parser
<<Short nsd-control description and caveats>>
Usage (cli):
$ nsd-control | jc --nsd-control
or
$ jc nsd-control
Usage (module):
import jc
result = jc.parse('nsd-control', nsd-control_command_output)
Schema:
[
{
"version": string,
"verbosity": integer,
"ratelimit": integer
}
]
[
{
"zone": string
"status": {
"state": string,
"served-serial": string,
"commit-serial": string,
"wait": string
}
}
]
Examples:
$ nsd-control | jc --nsd-control status
[
{
"version": "4.6.2",
"verbosity": "2",
"ratelimit": "0"
}
]
$ nsd-control | jc --nsd-control zonestatus sunet.se
[
{
"zone": "sunet.se",
"status": {
"state": "ok",
"served-serial": "2023090704 since 2023-09-07T16:34:27",
"commit-serial": "2023090704 since 2023-09-07T16:34:27",
"wait": "28684 sec between attempts"
}
}
]
"""
from typing import List, Dict
import jc.utils
class info():
"""Provides parser metadata (version, author, etc.)"""
version = '1.0'
description = '`nsd-control` command parser'
author = 'Pettai'
author_email = 'pettai@sunet.se'
# details = 'enter any other details here'
# compatible options: linux, darwin, cygwin, win32, aix, freebsd
compatible = ['linux', 'darwin', 'cygwin', 'win32', 'aix', 'freebsd']
# tags options: generic, standard, file, string, binary, command
tags = ['command']
magic_commands = ['nsd-control']
__version__ = info.version
def _process(proc_data):
"""
Final processing to conform to the schema.
Parameters:
proc_data: (List of Dictionaries) raw structured data to process
Returns:
List of Dictionaries. Structured to conform to the schema.
"""
int_list = {'verbosity', 'ratelimit', 'wait'}
for entry in proc_data:
for key in entry:
if key in int_list:
entry[key] = jc.utils.convert_to_int(entry[key])
return proc_data
def parse(data: str, raw: bool = False, quiet: bool = False):
"""
Main text parsing function
Parameters:
data: (string) text data to parse
raw: (boolean) unprocessed output if True
quiet: (boolean) suppress warning messages if True
Returns:
List of Dictionaries. Raw or processed structured data.
"""
jc.utils.compatibility(__name__, info.compatible, quiet)
jc.utils.input_type_check(data)
raw_output: List[Dict] = []
warned = False
if jc.utils.has_data(data):
itrparse = False
itr = {}
for line in filter(None, data.splitlines()):
line = line.strip()
# default 'ok'
if line.startswith('ok'):
raw_output.append({'command': 'ok'})
continue
# status
if line.startswith('version:'):
status = {}
linedata = line.split(':', maxsplit=1)
version = linedata[1].strip()
status.update({'version': version})
continue
if line.startswith('verbosity:'):
linedata = line.split(':', maxsplit=1)
verbosity = linedata[1]
status.update({'verbosity': verbosity})
continue
if line.startswith('ratelimit:'):
linedata = line.split(':', maxsplit=1)
ratelimit = linedata[1]
status.update({'ratelimit': ratelimit})
raw_output.append(status)
continue
# print_cookie_secrets
if line.startswith('active'):
itrparse = True
itr = {}
linedata = line.split(':', maxsplit=1)
active = linedata[1].strip()
cookies.update({'active': active})
continue
if line.startswith('staging'):
linedata = line.split(':', maxsplit=1)
staging = linedata[1].strip()
cookies.update({'staging': staging})
continue
# print_tsig
if line.startswith('key:'):
tsigs = {}
tsigdata = dict()
linedata = line.split(' ', maxsplit=6)
name = linedata[2].strip('"').rstrip('"')
tsigdata.update({'name': name})
secret = linedata[4].strip('"').rstrip('"')
tsigdata.update({'secret': secret})
algorithm = linedata[6].strip('"').rstrip('"')
tsigdata.update({'algorithm': algorithm})
tsigs.update({'key': tsigdata})
raw_output.append(tsigs)
continue
# zonestatus
if line.startswith('zone:'):
zonename = dict()
zstatus = dict()
linedata = line.split(':\t', maxsplit=1)
zone = linedata[1]
zonename.update({'zone': zone})
continue
if line.startswith('state:'):
linedata = line.split(': ', maxsplit=1)
state = linedata[1]
zstatus.update({'state': state})
continue
if line.startswith('served-serial:'):
linedata = line.split(': ', maxsplit=1)
served = linedata[1].strip('"').rstrip('"')
zstatus.update({'served-serial': served})
continue
if line.startswith('commit-serial:'):
linedata = line.split(': ', maxsplit=1)
commit = linedata[1].strip('"').rstrip('"')
zstatus.update({'commit-serial': commit})
continue
if line.startswith('wait:'):
linedata = line.split(': ', maxsplit=1)
wait = linedata[1].strip('"').rstrip('"')
zstatus.update({'wait': wait})
zonename.update({'status': zstatus})
raw_output.append(zonename)
continue
# stats
if line.startswith('server') or line.startswith('num.') or line.startswith('size.') or line.startswith('time.') or line.startswith('zone.'):
itrparse = True
linedata = line.split('=', maxsplit=1)
key = linedata[0]
if key.startswith('time.'):
value = float(linedata[1])
else:
value = int(linedata[1])
itr.update({key: value})
continue
if itrparse:
raw_output.append(itr)
return raw_output if raw else _process(raw_output)

View File

@ -0,0 +1 @@
[{"version":"4.6.2","verbosity":2,"ratelimit":0}]

View File

@ -0,0 +1,3 @@
version: 4.6.2
verbosity: 2
ratelimit: 0

View File

@ -0,0 +1 @@
[{"zone":"sunet.se","status":{"state":"ok","served-serial":"2023091302 since 2023-09-14T00:50:11","commit-serial":"2023091302 since 2023-09-14T07:04:05","wait":"27023 sec between attempts"}}]

View File

@ -0,0 +1,5 @@
zone: sunet.se
state: ok
served-serial: "2023091302 since 2023-09-14T00:50:11"
commit-serial: "2023091302 since 2023-09-14T07:04:05"
wait: "27023 sec between attempts"

View File

@ -0,0 +1 @@
[{"command":"ok"}]

View File

@ -0,0 +1 @@
ok

64
tests/test_nsd_control.py Normal file
View File

@ -0,0 +1,64 @@
import os
import unittest
import json
from typing import Dict
from jc.parsers.nsd_control import parse
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
class MyTests(unittest.TestCase):
f_in: Dict = {}
f_json: Dict = {}
@classmethod
def setUpClass(cls):
fixtures = {
'default': (
'fixtures/generic/nsd_control.out',
'fixtures/generic/nsd_control.json'),
'status': (
'fixtures/generic/nsd_control-status.out',
'fixtures/generic/nsd_control-status.json'),
'zonestatus': (
'fixtures/generic/nsd_control-zonestatus.out',
'fixtures/generic/nsd_control-zonestatus.json')
}
for file, filepaths in fixtures.items():
with open(os.path.join(THIS_DIR, filepaths[0]), 'r', encoding='utf-8') as a, \
open(os.path.join(THIS_DIR, filepaths[1]), 'r', encoding='utf-8') as b:
cls.f_in[file] = a.read()
cls.f_json[file] = json.loads(b.read())
def test_nsd_control_default(self):
"""
Test 'nsd-control <command>' with default output
"""
self.assertEqual(
parse(self.f_in['default'], quiet=True),
self.f_json['default']
)
def test_nsd_control_status(self):
"""
Test 'nsd-control status'
"""
self.assertEqual(
parse(self.f_in['status'], quiet=True),
self.f_json['status']
)
def test_nsd_control_zonestatus(self):
"""
Test 'nsd-control zonestatus'
"""
self.assertEqual(
parse(self.f_in['zonestatus'], quiet=True),
self.f_json['zonestatus']
)
if __name__ == '__main__':
unittest.main()