X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fsend_icmp_wait_for_reply.py;h=a77f15efc2e3e5dc583611b25ba4577a5ff38cee;hp=3c8b71abc8029bc1b91a642ebe25e9c51deb1969;hb=a060d519d0a89574f5e75d4e8d4c142fa5687555;hpb=681bb937b97b0c2bf10595e2042cf5943d61896b diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py index 3c8b71abc8..a77f15efc2 100755 --- a/resources/traffic_scripts/send_icmp_wait_for_reply.py +++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py @@ -12,22 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Traffic script that sends an IP ICMPv4.""" +"""Traffic script that sends an IP ICMPv4 or ICMPv6.""" import sys +import ipaddress from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.all import Ether from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -def is_icmp_reply(pkt): +def is_icmp_reply(pkt, ipformat): """Return True if pkt is echo reply, else return False. If exception occurs - return False.""" + return False. + + :param pkt: Packet. + :param ipformat: Dictionary of names to distinguish IPv4 and IPv6. + :type pkt: dict + :type ipformat: dict + :rtype: bool + """ + #pylint: disable=bare-except try: - if pkt['IP']['ICMP'].type == 0: # 0 - echo-reply + if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \ + ipformat['Type']: return True else: return False @@ -35,16 +46,59 @@ def is_icmp_reply(pkt): return False -def address_check(request, reply): +def address_check(request, reply, ipformat): """Compare request packet source address with reply destination address - and vice versa. If exception occurs return False.""" + and vice versa. If exception occurs return False. + + :param request: Sent packet containing request. + :param reply: Received packet containing reply. + :param ipformat: Dictionary of names to distinguish IPv4 and IPv6. + :type request: dict + :type reply: dict + :type ipformat: dict + :rtype: bool + """ + #pylint: disable=bare-except try: - return reply['IP'].src == request['IP'].dst \ - and reply['IP'].dst == request['IP'].src + r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst + r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src + return r_src and r_dst except: return False +def valid_ipv4(ip): + """Check if IP address has the correct IPv4 address format. + + :param ip: IP address. + :type ip: str + :return: True in case of correct IPv4 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv4Address(unicode(ip)) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def valid_ipv6(ip): + """Check if IP address has the correct IPv6 address format. + + :param ip: IP address. + :type ip: str + :return: True in case of correct IPv6 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv6Address(unicode(ip)) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + def main(): """Send ICMP echo request and wait for ICMP echo reply. It ignores all other packets.""" @@ -65,21 +119,33 @@ def main(): sent_packets = [] # Create empty ip ICMP packet - icmp_request = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP()) + if valid_ipv4(src_ip) and valid_ipv4(dst_ip): + icmp_request = (Ether(src=src_mac, dst=dst_mac) / + IP(src=src_ip, dst=dst_ip) / + ICMP()) + ip_format = {'IPType': 'IP', 'ICMP_req': 'ICMP', + 'ICMP_rep': 'ICMP', 'Type': 0} + elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): + icmp_request = (Ether(src=src_mac, dst=dst_mac) / + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) + ip_format = {'IPType': 'IPv6', 'ICMP_req': 'ICMPv6 Echo Request', + 'ICMP_rep': 'ICMPv6 Echo Reply', 'Type': 129} + else: + raise ValueError("IP not in correct format") + # Send created packet on the interface sent_packets.append(icmp_request) txq.send(icmp_request) for _ in range(1000): - icmp_reply = rxq.recv(wait_step) + icmp_reply = rxq.recv(wait_step, ignore=sent_packets) if icmp_reply is None: timeout -= wait_step if timeout < 0: raise RuntimeError("ICMP echo Rx timeout") - elif is_icmp_reply(icmp_reply): - if address_check(icmp_request, icmp_reply): + elif is_icmp_reply(icmp_reply, ip_format): + if address_check(icmp_request, icmp_reply, ip_format): break else: raise RuntimeError("Max packet count limit reached")