api: string type to convert to vector
[vpp.git] / test / test_ip6.py
index 6c959a2..4f267b8 100644 (file)
@@ -10,7 +10,7 @@ from scapy.contrib.mpls import MPLS
 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
-    ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply
+    ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.packet import Raw
 from scapy.utils import inet_pton, inet_ntop
@@ -36,6 +36,8 @@ try:
 except NameError:
     text_type = str
 
+NUM_PKTS = 67
+
 
 class TestIPv6ND(VppTestCase):
     def validate_ra(self, intf, rx, dst_ip=None):
@@ -357,6 +359,24 @@ class TestIPv6(TestIPv6ND):
                             "Interface %s: Packet expected from interface %s "
                             "didn't arrive" % (dst_if.name, i.name))
 
+    def test_next_header_anomaly(self):
+        """ IPv6 next header anomaly test
+
+        Test scenario:
+            - ipv6 next header field = Fragment Header (44)
+            - next header is ICMPv6 Echo Request
+            - wait for reassembly
+        """
+        pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
+               IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
+               ICMPv6EchoRequest())
+
+        self.pg0.add_stream(pkt)
+        self.pg_start()
+
+        # wait for reassembly
+        self.sleep(10)
+
     def test_fib(self):
         """ IPv6 FIB test
 
@@ -1308,6 +1328,7 @@ class TestIPv6RDControlPlane(TestIPv6ND):
         # check FIB still contains the SLAAC address
         addresses = set(self.get_interface_addresses(fib, self.pg0))
         new_addresses = addresses.difference(initial_addresses)
+
         self.assertEqual(len(new_addresses), 1)
         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
@@ -1760,7 +1781,7 @@ class TestIP6LoadBalance(VppTestCase):
         src_ip_pkts = []
         src_mpls_pkts = []
 
-        for ii in range(65):
+        for ii in range(NUM_PKTS):
             port_ip_hdr = (
                 IPv6(dst="3000::1", src="3000:1::1") /
                 inet6.UDP(sport=1234, dport=1234 + ii) /
@@ -2267,7 +2288,7 @@ class TestIP6Input(VppTestCase):
                      inet6.UDP(sport=1234, dport=1234) /
                      Raw('\xa5' * 100))
 
-        rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
+        rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
         rx = rx[0]
         icmp = rx[ICMPv6TimeExceeded]
 
@@ -2306,10 +2327,23 @@ class TestIP6Input(VppTestCase):
                      l4 /
                      Raw('\xa5' * 100))
 
-        self.send_and_assert_no_replies(self.pg0, p_version * 65,
+        self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
                                         remark=msg or "",
                                         timeout=timeout)
 
+    def test_hop_by_hop(self):
+        """ Hop-by-hop header test """
+
+        p = (Ether(src=self.pg0.remote_mac,
+                   dst=self.pg0.local_mac) /
+             IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
+             IPv6ExtHdrHopByHop() /
+             inet6.UDP(sport=1234, dport=1234) /
+             Raw('\xa5' * 100))
+
+        self.pg0.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)