X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fsend_icmp_wait_for_reply.py;h=7677152801ca051e25c733b6a0c2b2fce6af7389;hp=3c8b71abc8029bc1b91a642ebe25e9c51deb1969;hb=82094363f6077e1b28845719db3a6191c0c93a99;hpb=681bb937b97b0c2bf10595e2042cf5943d61896b diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py index 3c8b71abc8..7677152801 100755 --- a/resources/traffic_scripts/send_icmp_wait_for_reply.py +++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py @@ -12,36 +12,90 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Traffic script that sends an IP ICMPv4.""" +"""Traffic script that sends an IP ICMPv4 or ICMPv6.""" import sys +import ipaddress -from scapy.layers.inet import ICMP, IP from scapy.all import Ether +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -def is_icmp_reply(pkt): +def is_icmp_reply(pkt, ipformat): """Return True if pkt is echo reply, else return False. If exception occurs - return False.""" + return False. + + :param pkt: Packet. + :param ipformat: Dictionary of names to distinguish IPv4 and IPv6. + :type pkt: dict + :type ipformat: dict + :rtype: bool + """ + # pylint: disable=bare-except try: - if pkt['IP']['ICMP'].type == 0: # 0 - echo-reply + if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \ + ipformat['Type']: return True else: return False - except: + except: # pylint: disable=bare-except return False -def address_check(request, reply): +def address_check(request, reply, ipformat): """Compare request packet source address with reply destination address - and vice versa. If exception occurs return False.""" + and vice versa. If exception occurs return False. + + :param request: Sent packet containing request. + :param reply: Received packet containing reply. + :param ipformat: Dictionary of names to distinguish IPv4 and IPv6. + :type request: dict + :type reply: dict + :type ipformat: dict + :rtype: bool + """ + # pylint: disable=bare-except try: - return reply['IP'].src == request['IP'].dst \ - and reply['IP'].dst == request['IP'].src - except: + r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst + r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src + return r_src and r_dst + except: # pylint: disable=bare-except + return False + + +def valid_ipv4(ip): + """Check if IP address has the correct IPv4 address format. + + :param ip: IP address. + :type ip: str + :return: True in case of correct IPv4 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv4Address(unicode(ip)) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def valid_ipv6(ip): + """Check if IP address has the correct IPv6 address format. + + :param ip: IP address. + :type ip: str + :return: True in case of correct IPv6 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv6Address(unicode(ip)) + return True + except (AttributeError, ipaddress.AddressValueError): return False @@ -65,21 +119,42 @@ def main(): sent_packets = [] # Create empty ip ICMP packet - icmp_request = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP()) + if valid_ipv4(src_ip) and valid_ipv4(dst_ip): + icmp_request = (Ether(src=src_mac, dst=dst_mac) / + IP(src=src_ip, dst=dst_ip) / + ICMP()) + ip_format = {'IPType': 'IP', 'ICMP_req': 'ICMP', + 'ICMP_rep': 'ICMP', 'Type': 0} + elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): + icmp_request = (Ether(src=src_mac, dst=dst_mac) / + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) + ip_format = {'IPType': 'IPv6', 'ICMP_req': 'ICMPv6 Echo Request', + 'ICMP_rep': 'ICMPv6 Echo Reply', 'Type': 129} + else: + raise ValueError("IP not in correct format") + # Send created packet on the interface sent_packets.append(icmp_request) txq.send(icmp_request) for _ in range(1000): - icmp_reply = rxq.recv(wait_step) - if icmp_reply is None: - timeout -= wait_step - if timeout < 0: - raise RuntimeError("ICMP echo Rx timeout") - elif is_icmp_reply(icmp_reply): - if address_check(icmp_request, icmp_reply): + while True: + icmp_reply = rxq.recv(wait_step, ignore=sent_packets) + if icmp_reply is None: + timeout -= wait_step + if timeout < 0: + raise RuntimeError("ICMP echo Rx timeout") + + elif icmp_reply.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break + + if is_icmp_reply(icmp_reply, ip_format): + if address_check(icmp_request, icmp_reply, ip_format): break else: raise RuntimeError("Max packet count limit reached") @@ -88,5 +163,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main()