1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-07-15 01:24:29 +02:00

update schema and add normalized fields

This commit is contained in:
Kelly Brazil
2021-04-23 08:13:53 -07:00
parent 0152e0665f
commit 125e54213e

View File

@ -1,5 +1,6 @@
"""jc - JSON CLI output utility `ufw app info [application]` command output parser
Because `ufw` application definitions allow overlapping ports and port ranges, this parser preserves that behavior, but also provides `normalized` lists and ranges that remove duplicate ports and merge overlapping ranges.
Usage (cli):
@ -17,29 +18,45 @@ Usage (module):
Schema:
{
"profile": string,
"title": string,
"description": string,
"ports": {
"tcp_list": [
integer
],
"tcp_ranges": [
{
"start": integer, # 'any' is converted to start/end: 0/65535
"end": integer
}
],
"udp_list": [
integer
],
"upd_ranges": [
{
"start": integer, # 'any' is converted to start/end: 0/65535
"end": integer
}
]
}
"profile": string,
"title": string,
"description": string,
"tcp_list": [
integer
],
"tcp_ranges": [
{
"start": integer, # 'any' is converted to start/end: 0/65535
"end": integer
}
],
"udp_list": [
integer
],
"udp_ranges": [
{
"start": integer, # 'any' is converted to start/end: 0/65535
"end": integer
}
],
"normalized_tcp_list": [
integers # duplicates and overlapping are removed
],
"normalized_tcp_ranges": [
{
"start": integer, # 'any' is converted to start/end: 0/65535
"end": integers # overlapping are merged
}
],
"normalized_udp_list": [
integer
],
"normalized_udp_ranges": [
{
"start": integer, # 'any' is converted to start/end: 0/65535
"end": integers # overlapping are merged
}
]
}
Examples:
@ -78,9 +95,50 @@ def _process(proc_data):
Dictionary. Structured to conform to the schema.
"""
# convert to ints
int_list = ['start', 'end']
# rebuild output for added semantic information
# use helper functions in jc.utils for int, float, bool conversions and timestamps
if 'tcp_list' in proc_data:
proc_data['tcp_list'] = [int(p) for p in proc_data['tcp_list']]
if 'udp_list' in proc_data:
proc_data['udp_list'] = [int(p) for p in proc_data['udp_list']]
for protocol in ['tcp', 'udp']:
if protocol + '_ranges' in proc_data:
for i, item in enumerate(proc_data[protocol + '_ranges']):
for key in item:
if key in int_list:
proc_data[protocol + '_ranges'][i][key] = int(proc_data[protocol + '_ranges'][i][key])
# create normalized port lists and port ranges (remove duplicates and merge ranges)
# dump ranges into a set of 0 - 65535
# if items in the port list are in the set, then remove them
# iterate through the set to find gaps and create new ranges based on them
for protocol in ['tcp', 'udp']:
port_set = set()
if protocol + '_ranges' in proc_data:
for item in proc_data[protocol + '_ranges']:
port_set.update(range(item['start'], item['end'] + 1))
proc_data['normalized_' + protocol + '_list'] = sorted(set([p for p in proc_data[protocol + '_list'] if p not in port_set]))
new_port_ranges = []
state = 'findstart' # 'findstart' or 'findend'
for port in range(0, 65535 + 2):
if state == 'findstart':
port_range_obj = {}
if port in port_set:
port_range_obj['start'] = port
state = 'findend'
continue
if state == 'findend':
if port not in port_set:
port_range_obj['end'] = port - 1
new_port_ranges.append(port_range_obj)
state = 'findstart'
proc_data['normalized_' + protocol + '_ranges'] = new_port_ranges
return proc_data
@ -94,10 +152,10 @@ def _parse_port_list(data, port_list=None):
port_list = []
data = data.split(',')
data_list = [int(p) for p in data if ':' not in p and 'any' not in p]
data_list = [p.strip() for p in data if ':' not in p and 'any' not in p]
port_list.extend(data_list)
return sorted(list(set(port_list)))
return port_list
def _parse_port_range(data, range_list=None):
@ -122,8 +180,8 @@ def _parse_port_range(data, range_list=None):
for range_ in ranges:
range_obj = {
'start': int(range_.split(':')[0]),
'end': int(range_.split(':')[1])
'start': range_.split(':')[0],
'end': range_.split(':')[1]
}
range_list.append(range_obj)
@ -169,18 +227,17 @@ def parse(data, raw=False, quiet=False):
if line.startswith('Ports:'):
ports = True
ports_obj = {'ports': {}}
continue
if ports:
line_list = line.rsplit('/', maxsplit=1)
if len(line_list) == 2:
if line_list[1] == 'tcp':
ports_obj['ports']['tcp_list'] = _parse_port_list(line_list[0])
ports_obj['ports']['tcp_ranges'] = _parse_port_range(line_list[0])
raw_output['tcp_list'] = _parse_port_list(line_list[0])
raw_output['tcp_ranges'] = _parse_port_range(line_list[0])
elif line_list[1] == 'udp':
ports_obj['ports']['udp_list'] = _parse_port_list(line_list[0])
ports_obj['ports']['udp_ranges'] = _parse_port_range(line_list[0])
raw_output['udp_list'] = _parse_port_list(line_list[0])
raw_output['udp_ranges'] = _parse_port_range(line_list[0])
# 'any' case
else:
@ -189,24 +246,24 @@ def parse(data, raw=False, quiet=False):
u_list = []
u_range = []
if 'tcp_list' in ports_obj['ports']:
t_list = ports_obj['ports']['tcp_list']
if 'tcp_list' in raw_output:
t_list = raw_output['tcp_list']
if 'tcp_ranges' in ports_obj['ports']:
t_range = ports_obj['ports']['tcp_ranges']
if 'tcp_ranges' in raw_output:
t_range = raw_output['tcp_ranges']
if 'udp_list' in ports_obj['ports']:
u_list = ports_obj['ports']['udp_list']
if 'udp_list' in raw_output:
u_list = raw_output['udp_list']
if 'udp_ranges' in ports_obj['ports']:
u_range = ports_obj['ports']['udp_ranges']
if 'udp_ranges' in raw_output:
u_range = raw_output['udp_ranges']
ports_obj['ports']['tcp_list'] = _parse_port_list(line, t_list)
ports_obj['ports']['tcp_ranges'] = _parse_port_range(line, t_range)
ports_obj['ports']['udp_list'] = _parse_port_list(line, u_list)
ports_obj['ports']['udp_ranges'] = _parse_port_range(line, u_range)
raw_output['tcp_list'] = _parse_port_list(line, t_list)
raw_output['tcp_ranges'] = _parse_port_range(line, t_range)
raw_output['udp_list'] = _parse_port_list(line, u_list)
raw_output['udp_ranges'] = _parse_port_range(line, u_range)
raw_output.update(ports_obj)
raw_output.update(raw_output)
if raw:
return raw_output