diff --git a/jc/parsers/ufw_appinfo.py b/jc/parsers/ufw_appinfo.py index 9f3c4a93..52ab6cc5 100644 --- a/jc/parsers/ufw_appinfo.py +++ b/jc/parsers/ufw_appinfo.py @@ -1,5 +1,6 @@ """jc - JSON CLI output utility `ufw app info [application]` command output parser +Because `ufw` application definitions allow overlapping ports and port ranges, this parser preserves that behavior, but also provides `normalized` lists and ranges that remove duplicate ports and merge overlapping ranges. Usage (cli): @@ -17,29 +18,45 @@ Usage (module): Schema: { - "profile": string, - "title": string, - "description": string, - "ports": { - "tcp_list": [ - integer - ], - "tcp_ranges": [ - { - "start": integer, # 'any' is converted to start/end: 0/65535 - "end": integer - } - ], - "udp_list": [ - integer - ], - "upd_ranges": [ - { - "start": integer, # 'any' is converted to start/end: 0/65535 - "end": integer - } - ] - } + "profile": string, + "title": string, + "description": string, + "tcp_list": [ + integer + ], + "tcp_ranges": [ + { + "start": integer, # 'any' is converted to start/end: 0/65535 + "end": integer + } + ], + "udp_list": [ + integer + ], + "udp_ranges": [ + { + "start": integer, # 'any' is converted to start/end: 0/65535 + "end": integer + } + ], + "normalized_tcp_list": [ + integers # duplicates and overlapping are removed + ], + "normalized_tcp_ranges": [ + { + "start": integer, # 'any' is converted to start/end: 0/65535 + "end": integers # overlapping are merged + } + ], + "normalized_udp_list": [ + integer + ], + "normalized_udp_ranges": [ + { + "start": integer, # 'any' is converted to start/end: 0/65535 + "end": integers # overlapping are merged + } + ] } Examples: @@ -78,9 +95,50 @@ def _process(proc_data): Dictionary. Structured to conform to the schema. """ + # convert to ints + int_list = ['start', 'end'] - # rebuild output for added semantic information - # use helper functions in jc.utils for int, float, bool conversions and timestamps + if 'tcp_list' in proc_data: + proc_data['tcp_list'] = [int(p) for p in proc_data['tcp_list']] + + if 'udp_list' in proc_data: + proc_data['udp_list'] = [int(p) for p in proc_data['udp_list']] + + for protocol in ['tcp', 'udp']: + if protocol + '_ranges' in proc_data: + for i, item in enumerate(proc_data[protocol + '_ranges']): + for key in item: + if key in int_list: + proc_data[protocol + '_ranges'][i][key] = int(proc_data[protocol + '_ranges'][i][key]) + + # create normalized port lists and port ranges (remove duplicates and merge ranges) + # dump ranges into a set of 0 - 65535 + # if items in the port list are in the set, then remove them + # iterate through the set to find gaps and create new ranges based on them + for protocol in ['tcp', 'udp']: + port_set = set() + if protocol + '_ranges' in proc_data: + for item in proc_data[protocol + '_ranges']: + port_set.update(range(item['start'], item['end'] + 1)) + + proc_data['normalized_' + protocol + '_list'] = sorted(set([p for p in proc_data[protocol + '_list'] if p not in port_set])) + + new_port_ranges = [] + state = 'findstart' # 'findstart' or 'findend' + for port in range(0, 65535 + 2): + if state == 'findstart': + port_range_obj = {} + if port in port_set: + port_range_obj['start'] = port + state = 'findend' + continue + if state == 'findend': + if port not in port_set: + port_range_obj['end'] = port - 1 + new_port_ranges.append(port_range_obj) + state = 'findstart' + + proc_data['normalized_' + protocol + '_ranges'] = new_port_ranges return proc_data @@ -94,10 +152,10 @@ def _parse_port_list(data, port_list=None): port_list = [] data = data.split(',') - data_list = [int(p) for p in data if ':' not in p and 'any' not in p] + data_list = [p.strip() for p in data if ':' not in p and 'any' not in p] port_list.extend(data_list) - return sorted(list(set(port_list))) + return port_list def _parse_port_range(data, range_list=None): @@ -122,8 +180,8 @@ def _parse_port_range(data, range_list=None): for range_ in ranges: range_obj = { - 'start': int(range_.split(':')[0]), - 'end': int(range_.split(':')[1]) + 'start': range_.split(':')[0], + 'end': range_.split(':')[1] } range_list.append(range_obj) @@ -169,18 +227,17 @@ def parse(data, raw=False, quiet=False): if line.startswith('Ports:'): ports = True - ports_obj = {'ports': {}} continue if ports: line_list = line.rsplit('/', maxsplit=1) if len(line_list) == 2: if line_list[1] == 'tcp': - ports_obj['ports']['tcp_list'] = _parse_port_list(line_list[0]) - ports_obj['ports']['tcp_ranges'] = _parse_port_range(line_list[0]) + raw_output['tcp_list'] = _parse_port_list(line_list[0]) + raw_output['tcp_ranges'] = _parse_port_range(line_list[0]) elif line_list[1] == 'udp': - ports_obj['ports']['udp_list'] = _parse_port_list(line_list[0]) - ports_obj['ports']['udp_ranges'] = _parse_port_range(line_list[0]) + raw_output['udp_list'] = _parse_port_list(line_list[0]) + raw_output['udp_ranges'] = _parse_port_range(line_list[0]) # 'any' case else: @@ -189,24 +246,24 @@ def parse(data, raw=False, quiet=False): u_list = [] u_range = [] - if 'tcp_list' in ports_obj['ports']: - t_list = ports_obj['ports']['tcp_list'] + if 'tcp_list' in raw_output: + t_list = raw_output['tcp_list'] - if 'tcp_ranges' in ports_obj['ports']: - t_range = ports_obj['ports']['tcp_ranges'] + if 'tcp_ranges' in raw_output: + t_range = raw_output['tcp_ranges'] - if 'udp_list' in ports_obj['ports']: - u_list = ports_obj['ports']['udp_list'] + if 'udp_list' in raw_output: + u_list = raw_output['udp_list'] - if 'udp_ranges' in ports_obj['ports']: - u_range = ports_obj['ports']['udp_ranges'] + if 'udp_ranges' in raw_output: + u_range = raw_output['udp_ranges'] - ports_obj['ports']['tcp_list'] = _parse_port_list(line, t_list) - ports_obj['ports']['tcp_ranges'] = _parse_port_range(line, t_range) - ports_obj['ports']['udp_list'] = _parse_port_list(line, u_list) - ports_obj['ports']['udp_ranges'] = _parse_port_range(line, u_range) + raw_output['tcp_list'] = _parse_port_list(line, t_list) + raw_output['tcp_ranges'] = _parse_port_range(line, t_range) + raw_output['udp_list'] = _parse_port_list(line, u_list) + raw_output['udp_ranges'] = _parse_port_range(line, u_range) - raw_output.update(ports_obj) + raw_output.update(raw_output) if raw: return raw_output