1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-23 00:29:59 +02:00

feat: get correct probe IP when mutliple probe on hop (#562)

Co-authored-by: Pierre-Olivier Côté <pierre-olivier.cote@polymtl.ca>
This commit is contained in:
Kelly Brazil
2024-04-29 13:58:11 -07:00
committed by GitHub
parent dfc942776b
commit 5bec53b3f8
6 changed files with 60 additions and 54 deletions

View File

@ -118,6 +118,7 @@ Examples:
import re
from decimal import Decimal
import jc.utils
from copy import deepcopy
class info():
@ -237,6 +238,57 @@ class _Probe(object):
text += "\n"
return text
def _get_probes(hop_string: str):
probes = []
probe_asn_match = [ (match, "ASN") for match in RE_PROBE_ASN.finditer(hop_string)]
probe_name_ip_match = [(match, "NAME_IP") for match in RE_PROBE_NAME_IP.finditer(hop_string)]
probe_ip_only_match = [(match, "IP_ONLY") for match in RE_PROBE_IP_ONLY.finditer(hop_string)]
probe_bsd_ipv6_match = [(match, "IP_IPV6") for match in RE_PROBE_BSD_IPV6.finditer(hop_string)]
probe_ipv6_only_match = [(match, "IP_IPV6_ONLY") for match in RE_PROBE_IPV6_ONLY.finditer(hop_string)]
probe_rtt_annotations = [(match, "RTT") for match in RE_PROBE_RTT_ANNOTATION.finditer(hop_string)]
matches = sorted(probe_asn_match + probe_name_ip_match + probe_ip_only_match + probe_bsd_ipv6_match + probe_ipv6_only_match + probe_rtt_annotations, key=lambda x: x[0].start(0))
probe, is_last_match_rtt = _Probe(), False
for match, match_type in matches:
if match_type == "ASN":
probe.asn = int(match.group(1))
elif match_type == "NAME_IP":
probe.name = match.group(1)
probe.ip = match.group(2)
elif match_type == "IP_ONLY":
probe.ip = match.group(1)
elif match_type == "IP_IPV6":
probe.ip = match.group(0)
elif match_type == "IP_IPV6_ONLY":
probe.ip = match.group(1)
elif match_type == "RTT":
if match.groups()[0]:
probe_rtt = Decimal(match.groups()[0])
elif match.groups()[1]:
probe_rtt = None
else:
message = f"Expected probe RTT or *. Got: '{match.group(0)}'"
raise ParseError(message)
# If the last match is a RTT, then copy all probe values and replace RTT field
if is_last_match_rtt:
probe = deepcopy(last_probe)
# Set RTT values
probe.rtt = probe_rtt
probe.annotation = match.groups()[2] or None
# RTT is the last value shown for a hop
if any([probe.ip, probe.asn, probe.annotation, probe.rtt, probe.name]):
probes.append(probe)
last_probe = probe
probe = _Probe()
if match_type == "RTT":
is_last_match_rtt = True
else:
is_last_match_rtt = False
return probes
def _loads(data):
lines = data.splitlines()
@ -270,56 +322,10 @@ def _loads(data):
hop_string = hop_match.group(2)
probe_asn_match = RE_PROBE_ASN.search(hop_string)
if probe_asn_match:
probe_asn = int(probe_asn_match.group(1))
else:
probe_asn = None
probe_name_ip_match = RE_PROBE_NAME_IP.search(hop_string)
probe_ip_only_match = RE_PROBE_IP_ONLY.search(hop_string)
probe_bsd_ipv6_match = RE_PROBE_BSD_IPV6.search(hop_string)
probe_ipv6_only_match = RE_PROBE_IPV6_ONLY.search(hop_string)
if probe_ip_only_match:
probe_name = None
probe_ip = probe_ip_only_match.group(1)
elif probe_name_ip_match:
probe_name = probe_name_ip_match.group(1)
probe_ip = probe_name_ip_match.group(2)
elif probe_bsd_ipv6_match:
probe_name = None
probe_ip = probe_bsd_ipv6_match.group(0)
elif probe_ipv6_only_match:
probe_name = None
probe_ip = probe_ipv6_only_match.group(1)
else:
probe_name = None
probe_ip = None
probe_rtt_annotations = RE_PROBE_RTT_ANNOTATION.findall(hop_string)
for probe_rtt_annotation in probe_rtt_annotations:
if probe_rtt_annotation[0]:
probe_rtt = Decimal(probe_rtt_annotation[0])
elif probe_rtt_annotation[1]:
probe_rtt = None
else:
message = f"Expected probe RTT or *. Got: '{probe_rtt_annotation[0]}'"
raise ParseError(message)
probe_annotation = probe_rtt_annotation[2] or None
probe = _Probe(
name=probe_name,
ip=probe_ip,
asn=probe_asn,
rtt=probe_rtt,
annotation=probe_annotation
)
# only add probe if there is data
if any([probe_name, probe_ip, probe_asn, probe_rtt, probe_annotation]):
hop.add_probe(probe)
probes = _get_probes(hop_string)
for probe in probes:
hop.add_probe(probe)
return traceroute