1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-19 00:17:51 +02:00

working raw parser

This commit is contained in:
Kelly Brazil
2021-04-20 20:32:59 -07:00
parent 9a0fb2a7c8
commit eec673be90

View File

@ -35,6 +35,7 @@ Examples:
""" """
import jc.utils import jc.utils
import re import re
import ipaddress
class info(): class info():
@ -72,8 +73,92 @@ def _process(proc_data):
return proc_data return proc_data
def _parse_to_from(linedata, direction): def _parse_to_from(linedata, direction, rule_obj=None):
return {}
if rule_obj is None:
rule_obj = {}
# pull out rule index, if they exist: [ 1]
if direction == 'to':
RE_LINE_NUM = re.compile(r'\[[ 0-9]+\]\s')
line_number_match = re.search(RE_LINE_NUM, linedata)
if line_number_match:
rule_obj['index'] = line_number_match.group(0).replace('[', '').replace(']', '').strip()
linedata = re.sub(RE_LINE_NUM, '', linedata)
else:
rule_obj['index'] = None
# pull (v6)
RE_V6 = re.compile(r'\(v6\)')
v6_match = re.search(RE_V6, linedata)
if v6_match:
rule_obj['network_protocol'] = 'ipv6'
linedata = re.sub(RE_V6, '', linedata)
elif not rule_obj.get('network_protocol'):
rule_obj['network_protocol'] = 'ipv4'
# pull 'Anywhere' if exists. Assign to 0.0.0.0/0 or ::/0 depending on if (v6) is found
if 'Anywhere' in linedata:
if rule_obj.get('network_protocol') == 'ipv6':
rule_obj[direction + '_ip'] = '::'
rule_obj[direction + '_subnet'] = '0'
elif rule_obj.get('network_protocol') == 'ipv4':
rule_obj[direction + '_ip'] = '0.0.0.0'
rule_obj[direction + '_subnet'] = '0'
linedata = linedata.replace('Anywhere', '')
# pull out interface (after 'on')
linedata_list = linedata.split(' on ', maxsplit=1)
if len(linedata_list) > 1:
rule_obj[direction + '_interface'] = linedata_list[1]
linedata = linedata_list[0]
else:
rule_obj[direction + '_interface'] = 'any'
# pull out ipv4 or ipv6 addresses
linedata_list = linedata.split()
new_linedata_list = []
valid_ip = None
for item in linedata_list:
try:
valid_ip = ipaddress.IPv4Interface(item)
except Exception:
try:
valid_ip = ipaddress.IPv6Interface(item)
except Exception:
new_linedata_list.append(item)
if valid_ip:
rule_obj[direction + '_ip'] = str(valid_ip.ip)
rule_obj[direction + '_subnet'] = str(valid_ip.with_prefixlen.split('/')[1])
linedata = ' '.join(new_linedata_list)
# pull out anything ending in 'udp', 'tcp'. strip on '/' for ports
linedata_list = linedata.split('/', maxsplit=1)
if len(linedata_list) > 1:
rule_obj[direction + '_transport'] = linedata_list[1].strip()
linedata = linedata_list[0]
else:
rule_obj[direction + '_transport'] = 'any'
# find the numeric port(s)
linedata_list = linedata.split(':', maxsplit=1)
if len(linedata_list) == 2 and linedata_list[1].isnumeric():
rule_obj[direction + '_start_port'] = linedata_list[0]
rule_obj[direction + '_end_port'] = linedata_list[1]
linedata = ''
elif len(linedata_list) == 1 and linedata_list[0].isnumeric():
rule_obj[direction + '_start_port'] = linedata_list[0]
rule_obj[direction + '_end_port'] = linedata_list[0]
linedata = ''
# only thing left should be the service name.
if linedata.strip():
rule_obj[direction + '_service'] = linedata.strip()
return rule_obj
def parse(data, raw=False, quiet=False): def parse(data, raw=False, quiet=False):
@ -145,7 +230,7 @@ def parse(data, raw=False, quiet=False):
rule_obj['action'] = action rule_obj['action'] = action
rule_obj['action_direction'] = action_direction rule_obj['action_direction'] = action_direction
rule_obj.update(_parse_to_from(to_line, 'to')) rule_obj.update(_parse_to_from(to_line, 'to'))
rule_obj.update(_parse_to_from(from_line, 'from')) rule_obj.update(_parse_to_from(from_line, 'from', rule_obj))
rules_list.append(rule_obj) rules_list.append(rule_obj)