From bbed9e274b8252ba0518140fe7fc97029310771f Mon Sep 17 00:00:00 2001 From: Kelly Brazil Date: Fri, 24 Sep 2021 09:16:44 -0700 Subject: [PATCH] linting --- jc/parsers/ping_s.py | 299 +++++++++++++++++++++---------------------- 1 file changed, 147 insertions(+), 152 deletions(-) diff --git a/jc/parsers/ping_s.py b/jc/parsers/ping_s.py index fda7b75b..4ffc70be 100644 --- a/jc/parsers/ping_s.py +++ b/jc/parsers/ping_s.py @@ -174,11 +174,12 @@ def _error_type(line): if err_type in line: return code + return None + def _bsd_parse(line, s): output_line = {} - if line.startswith('PING '): s.destination_ip = line.split()[2].lstrip('(').rstrip(':').rstrip(')') s.sent_bytes = line.split()[3] @@ -203,96 +204,76 @@ def _bsd_parse(line, s): s.packet_loss_percent = line.split()[8].rstrip('%') s.duplicates = line.split()[6].lstrip('+') return None - - else: - s.packets_transmitted = line.split()[0] - s.packets_received = line.split()[3] - s.packet_loss_percent = line.split()[6].rstrip('%') - s.duplicates = '0' - return None - else: - split_line = line.split(' = ')[1] - split_line = split_line.split('/') + s.packets_transmitted = line.split()[0] + s.packets_received = line.split()[3] + s.packet_loss_percent = line.split()[6].rstrip('%') + s.duplicates = '0' + return None + split_line = line.split(' = ')[1] + split_line = split_line.split('/') + + output_line = { + 'type': 'summary', + 'destination_ip': s.destination_ip or None, + 'sent_bytes': s.sent_bytes or None, + 'pattern': s.pattern or None, + 'packets_transmitted': s.packets_transmitted or None, + 'packets_received': s.packets_received or None, + 'packet_loss_percent': s.packet_loss_percent or None, + 'duplicates': s.duplicates or None, + 'round_trip_ms_min': split_line[0], + 'round_trip_ms_avg': split_line[1], + 'round_trip_ms_max': split_line[2], + 'round_trip_ms_stddev': split_line[3].replace(' ms', '') + } + + return output_line + + # ping response lines + + # ipv4 lines + if not _ipv6_in(line): + + # request timeout + if line.startswith('Request timeout for '): output_line = { - 'type': 'summary', + 'type': 'timeout', 'destination_ip': s.destination_ip or None, 'sent_bytes': s.sent_bytes or None, 'pattern': s.pattern or None, - 'packets_transmitted': s.packets_transmitted or None, - 'packets_received': s.packets_received or None, - 'packet_loss_percent': s.packet_loss_percent or None, - 'duplicates': s.duplicates or None, - 'round_trip_ms_min': split_line[0], - 'round_trip_ms_avg': split_line[1], - 'round_trip_ms_max': split_line[2], - 'round_trip_ms_stddev': split_line[3].replace(' ms', '') + 'icmp_seq': line.split()[4] } return output_line - # ping response lines - else: - # ipv4 lines - if not _ipv6_in(line): + # catch error responses + err = _error_type(line) + if err: + output_line = { + 'type': err + } - # request timeout - if line.startswith('Request timeout for '): - output_line = { - 'type': 'timeout', - 'destination_ip': s.destination_ip or None, - 'sent_bytes': s.sent_bytes or None, - 'pattern': s.pattern or None, - 'icmp_seq': line.split()[4] - } - - return output_line + try: + output_line['bytes'] = line.split()[0] + output_line['destination_ip'] = s.destination_ip + output_line['response_ip'] = line.split()[4].strip(':').strip('(').strip(')') + except Exception: + pass - # catch error responses - err = _error_type(line) - if err: - output_line = { - 'type': err - } + return output_line - try: - output_line['bytes'] = line.split()[0] - output_line['destination_ip'] = s.destination_ip - output_line['response_ip'] = line.split()[4].strip(':').strip('(').strip(')') - except Exception: - pass + # normal response + if ' bytes from ' in line: + line = line.replace(':', ' ').replace('=', ' ') - return output_line - - # normal response - elif ' bytes from ' in line: - line = line.replace(':', ' ').replace('=', ' ') - - output_line = { - 'type': 'reply', - 'destination_ip': s.destination_ip or None, - 'sent_bytes': s.sent_bytes or None, - 'pattern': s.pattern or None, - 'response_bytes': line.split()[0], - 'response_ip': line.split()[3], - 'icmp_seq': line.split()[5], - 'ttl': line.split()[7], - 'time_ms': line.split()[9] - } - - return output_line - - # ipv6 lines - elif ' bytes from ' in line: - line = line.replace(',', ' ').replace('=', ' ') - output_line = { 'type': 'reply', 'destination_ip': s.destination_ip or None, 'sent_bytes': s.sent_bytes or None, 'pattern': s.pattern or None, - 'bytes': line.split()[0], + 'response_bytes': line.split()[0], 'response_ip': line.split()[3], 'icmp_seq': line.split()[5], 'ttl': line.split()[7], @@ -301,6 +282,24 @@ def _bsd_parse(line, s): return output_line + # ipv6 lines + elif ' bytes from ' in line: + line = line.replace(',', ' ').replace('=', ' ') + + output_line = { + 'type': 'reply', + 'destination_ip': s.destination_ip or None, + 'sent_bytes': s.sent_bytes or None, + 'pattern': s.pattern or None, + 'bytes': line.split()[0], + 'response_ip': line.split()[3], + 'icmp_seq': line.split()[5], + 'ttl': line.split()[7], + 'time_ms': line.split()[9] + } + + return output_line + def _linux_parse(line, s): """ @@ -318,7 +317,7 @@ def _linux_parse(line, s): output_line = {} if line.startswith('PING '): - s.ipv4 = True if 'bytes of data' in line else False + s.ipv4 = 'bytes of data' in line if s.ipv4 and line[5] not in string.digits: s.hostname = True @@ -356,98 +355,94 @@ def _linux_parse(line, s): s.packet_loss_percent = line.split()[7].rstrip('%') s.duplicates = line.split()[5].lstrip('+') s.time_ms = line.split()[11].replace('ms', '') - return None - else: - s.packets_transmitted = line.split()[0] - s.packets_received = line.split()[3] - s.packet_loss_percent = line.split()[5].rstrip('%') - s.duplicates = '0' - s.time_ms = line.split()[9].replace('ms', '') - - return None + s.packets_transmitted = line.split()[0] + s.packets_received = line.split()[3] + s.packet_loss_percent = line.split()[5].rstrip('%') + s.duplicates = '0' + s.time_ms = line.split()[9].replace('ms', '') + return None - else: - split_line = line.split(' = ')[1] - split_line = split_line.split('/') - output_line = { - 'type': 'summary', - 'destination_ip': s.destination_ip or None, - 'sent_bytes': s.sent_bytes or None, - 'pattern': s.pattern or None, - 'packets_transmitted': s.packets_transmitted or None, - 'packets_received': s.packets_received or None, - 'packet_loss_percent': s.packet_loss_percent or None, - 'duplicates': s.duplicates or None, - 'time_ms': s.time_ms or None, - 'round_trip_ms_min': split_line[0], - 'round_trip_ms_avg': split_line[1], - 'round_trip_ms_max': split_line[2], - 'round_trip_ms_stddev': split_line[3].split()[0] - } + split_line = line.split(' = ')[1] + split_line = split_line.split('/') + output_line = { + 'type': 'summary', + 'destination_ip': s.destination_ip or None, + 'sent_bytes': s.sent_bytes or None, + 'pattern': s.pattern or None, + 'packets_transmitted': s.packets_transmitted or None, + 'packets_received': s.packets_received or None, + 'packet_loss_percent': s.packet_loss_percent or None, + 'duplicates': s.duplicates or None, + 'time_ms': s.time_ms or None, + 'round_trip_ms_min': split_line[0], + 'round_trip_ms_avg': split_line[1], + 'round_trip_ms_max': split_line[2], + 'round_trip_ms_stddev': split_line[3].split()[0] + } - return output_line + return output_line # ping response lines - else: - # request timeout - if 'no answer yet for icmp_seq=' in line: - timestamp = False - isequence = 5 - # if timestamp option is specified, then shift icmp sequence field right by one - if line[0] == '[': - timestamp = True - isequence = 6 + # request timeout + if 'no answer yet for icmp_seq=' in line: + timestamp = False + isequence = 5 - output_line = { - 'type': 'timeout', - 'destination_ip': s.destination_ip or None, - 'sent_bytes': s.sent_bytes or None, - 'pattern': s.pattern or None, - 'timestamp': line.split()[0].lstrip('[').rstrip(']') if timestamp else None, - 'icmp_seq': line.replace('=', ' ').split()[isequence] - } - - return output_line + # if timestamp option is specified, then shift icmp sequence field right by one + if line[0] == '[': + timestamp = True + isequence = 6 - # normal responses - elif ' bytes from ' in line: - - line = line.replace('(', ' ').replace(')', ' ').replace('=', ' ') + output_line = { + 'type': 'timeout', + 'destination_ip': s.destination_ip or None, + 'sent_bytes': s.sent_bytes or None, + 'pattern': s.pattern or None, + 'timestamp': line.split()[0].lstrip('[').rstrip(']') if timestamp else None, + 'icmp_seq': line.replace('=', ' ').split()[isequence] + } - # positions of items depend on whether ipv4/ipv6 and/or ip/hostname is used - if s.ipv4 and not s.hostname: - bts, rip, iseq, t2l, tms = (0, 3, 5, 7, 9) - elif s.ipv4 and s.hostname: - bts, rip, iseq, t2l, tms = (0, 4, 7, 9, 11) - elif not s.ipv4 and not s.hostname: - bts, rip, iseq, t2l, tms = (0, 3, 5, 7, 9) - elif not s.ipv4 and s.hostname: - bts, rip, iseq, t2l, tms = (0, 4, 7, 9, 11) + return output_line - # if timestamp option is specified, then shift everything right by one - timestamp = False - if line[0] == '[': - timestamp = True - bts, rip, iseq, t2l, tms = (bts + 1, rip + 1, iseq + 1, t2l + 1, tms + 1) + # normal responses + if ' bytes from ' in line: - output_line = { - 'type': 'reply', - 'destination_ip': s.destination_ip or None, - 'sent_bytes': s.sent_bytes or None, - 'pattern': s.pattern or None, - 'timestamp': line.split()[0].lstrip('[').rstrip(']') if timestamp else None, - 'response_bytes': line.split()[bts], - 'response_ip': line.split()[rip].rstrip(':'), - 'icmp_seq': line.split()[iseq], - 'ttl': line.split()[t2l], - 'time_ms': line.split()[tms], - 'duplicate': True if 'DUP!' in line else False - } + line = line.replace('(', ' ').replace(')', ' ').replace('=', ' ') - return output_line + # positions of items depend on whether ipv4/ipv6 and/or ip/hostname is used + if s.ipv4 and not s.hostname: + bts, rip, iseq, t2l, tms = (0, 3, 5, 7, 9) + elif s.ipv4 and s.hostname: + bts, rip, iseq, t2l, tms = (0, 4, 7, 9, 11) + elif not s.ipv4 and not s.hostname: + bts, rip, iseq, t2l, tms = (0, 3, 5, 7, 9) + elif not s.ipv4 and s.hostname: + bts, rip, iseq, t2l, tms = (0, 4, 7, 9, 11) + + # if timestamp option is specified, then shift everything right by one + timestamp = False + if line[0] == '[': + timestamp = True + bts, rip, iseq, t2l, tms = (bts + 1, rip + 1, iseq + 1, t2l + 1, tms + 1) + + output_line = { + 'type': 'reply', + 'destination_ip': s.destination_ip or None, + 'sent_bytes': s.sent_bytes or None, + 'pattern': s.pattern or None, + 'timestamp': line.split()[0].lstrip('[').rstrip(']') if timestamp else None, + 'response_bytes': line.split()[bts], + 'response_ip': line.split()[rip].rstrip(':'), + 'icmp_seq': line.split()[iseq], + 'ttl': line.split()[t2l], + 'time_ms': line.split()[tms], + 'duplicate': 'DUP!' in line + } + + return output_line def parse(data, raw=False, quiet=False, ignore_exceptions=False):