Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets
[csit.git] / resources / traffic_scripts / send_vxlangpe_nsh_for_proxy_test.py
index d998d7c..3ea1f0b 100755 (executable)
 from TG to DUT.
 """
 import sys
-import time
 
 from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
 from scapy.all import Ether, Packet, Raw
 
 from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
 
-from robot.api import logger
 
 def main():
     """Send VxLAN-GPE+NSH packet from TG to DUT.
 
     :raises: If the IP address is invalid.
     """
-    args = TrafficScriptArg(
-        ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
-        'timeout', 'framesize', 'testtype'])
+    args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+                             'timeout', 'framesize', 'testtype'])
 
     src_mac = args.get_arg('src_mac')
     dst_mac = args.get_arg('dst_mac')
@@ -44,7 +41,7 @@ def main():
     dst_ip = args.get_arg('dst_ip')
     tx_if = args.get_arg('tx_if')
     rx_if = args.get_arg('rx_if')
-    timeout = int(args.get_arg('timeout'))
+    timeout = max(2, int(args.get_arg('timeout')))
     frame_size = int(args.get_arg('framesize'))
     test_type = args.get_arg('testtype')
 
@@ -53,10 +50,9 @@ def main():
     sent_packets = []
 
     protocol = TCP
-    source_port = sfccon.DEF_SRC_PORT
-    destination_port = sfccon.DEF_DST_PORT
+    source_port = SfcCon.DEF_SRC_PORT
+    destination_port = SfcCon.DEF_DST_PORT
 
-    ip_version = None
     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
         ip_version = IP
     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -65,8 +61,8 @@ def main():
         raise ValueError("Invalid IP version!")
 
     innerpkt = (Ether(src=src_mac, dst=dst_mac) /
-               ip_version(src=src_ip, dst=dst_ip) /
-               protocol(sport=int(source_port), dport=int(destination_port)))
+                ip_version(src=src_ip, dst=dst_ip) /
+                protocol(sport=int(source_port), dport=int(destination_port)))
 
     vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x09\x00\x00\x06' \
                    '\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
@@ -75,9 +71,9 @@ def main():
     raw_data = vxlangpe_nsh + str(innerpkt)
 
     pkt_header = (Ether(src=src_mac, dst=dst_mac) /
-              ip_version(src=src_ip, dst=dst_ip) /
-              UDP(sport=int(source_port), dport=4790) /
-              Raw(load=raw_data))
+                  ip_version(src=src_ip, dst=dst_ip) /
+                  UDP(sport=int(source_port), dport=4790) /
+                  Raw(load=raw_data))
 
     fsize_no_fcs = frame_size - 4
     pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -89,10 +85,17 @@ def main():
     sent_packets.append(pkt_raw)
     txq.send(pkt_raw)
 
-    ether = rxq.recv(2)
-
-    if ether is None:
-        raise RuntimeError("No packet is received!")
+    while True:
+        ether = rxq.recv(timeout)
+        if ether is None:
+            raise RuntimeError('No packet is received!')
+
+        if ether.haslayer(ICMPv6ND_NS):
+            # read another packet in the queue if the current one is ICMPv6ND_NS
+            continue
+        else:
+            # otherwise process the current packet
+            break
 
     # let us begin to check the proxy inbound packet
     VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)