tests: fix handling failed test case
[vpp.git] / test / test_mpls.py
index f7709d1..a568f84 100644 (file)
@@ -17,8 +17,9 @@ from vpp_papi import VppEnum
 import scapy.compat
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, ARP
-from scapy.layers.inet import IP, UDP, ICMP
-from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
+from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6EchoRequest, \
+    ICMPv6PacketTooBig
 from scapy.contrib.mpls import MPLS
 
 NUM_PKTS = 67
@@ -196,7 +197,8 @@ class TestMPLS(VppTestCase):
         return pkts
 
     def create_stream_labelled_ip6(self, src_if, mpls_labels,
-                                   hlim=64, dst_ip=None):
+                                   hlim=64, dst_ip=None,
+                                   ping=0, ip_itf=None):
         if dst_ip is None:
             dst_ip = src_if.remote_ip6
         self.reset_packet_infos()
@@ -208,9 +210,14 @@ class TestMPLS(VppTestCase):
             for l in mpls_labels:
                 p = p / MPLS(label=l.value, ttl=l.ttl, cos=l.exp)
 
-            p = p / (IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
-                     UDP(sport=1234, dport=1234) /
-                     Raw(payload))
+            if ping:
+                p = p / (IPv6(src=ip_itf.remote_ip6,
+                              dst=ip_itf.local_ip6) /
+                         ICMPv6EchoRequest())
+            else:
+                p = p / (IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
+                         UDP(sport=1234, dport=1234) /
+                         Raw(payload))
             info.data = p.copy()
             pkts.append(p)
         return pkts
@@ -337,7 +344,8 @@ class TestMPLS(VppTestCase):
             raise
 
     def verify_capture_ip6(self, src_if, capture, sent,
-                           ip_hlim=None, ip_dscp=0):
+                           ip_hlim=None, ip_dscp=0,
+                           ping_resp=0):
         try:
             self.assertEqual(len(capture), len(sent))
 
@@ -352,15 +360,18 @@ class TestMPLS(VppTestCase):
                 tx_ip = tx[IPv6]
                 rx_ip = rx[IPv6]
 
-                self.assertEqual(rx_ip.src, tx_ip.src)
-                self.assertEqual(rx_ip.dst, tx_ip.dst)
-                self.assertEqual(rx_ip.tc,  ip_dscp)
-                # IP processing post pop has decremented the TTL
-                if not ip_hlim:
-                    self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
+                if not ping_resp:
+                    self.assertEqual(rx_ip.src, tx_ip.src)
+                    self.assertEqual(rx_ip.dst, tx_ip.dst)
+                    self.assertEqual(rx_ip.tc,  ip_dscp)
+                    # IP processing post pop has decremented the TTL
+                    if not ip_hlim:
+                        self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
+                    else:
+                        self.assertEqual(rx_ip.hlim, ip_hlim)
                 else:
-                    self.assertEqual(rx_ip.hlim, ip_hlim)
-
+                    self.assertEqual(rx_ip.src, tx_ip.dst)
+                    self.assertEqual(rx_ip.dst, tx_ip.src)
         except:
             raise
 
@@ -415,6 +426,31 @@ class TestMPLS(VppTestCase):
         except:
             raise
 
+    def verify_capture_fragmented_labelled_ip6(self, src_if, capture, sent,
+                                               mpls_labels, ip_ttl=None):
+        try:
+            capture = verify_filter(capture, sent)
+
+            for i in range(len(capture)):
+                tx = sent[0]
+                rx = capture[i]
+                tx_ip = tx[IPv6]
+                rx.show()
+                rx_ip = IPv6(rx[MPLS].payload)
+                rx_ip.show()
+
+                verify_mpls_stack(self, rx, mpls_labels)
+
+                self.assertEqual(rx_ip.src, tx_ip.src)
+                self.assertEqual(rx_ip.dst, tx_ip.dst)
+                if not ip_ttl:
+                    # IP processing post pop has decremented the hop-limit
+                    self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
+                else:
+                    self.assertEqual(rx_ip.hlim, ip_ttl)
+        except:
+            raise
+
     def test_swap(self):
         """ MPLS label swap tests """
 
@@ -898,6 +934,11 @@ class TestMPLS(VppTestCase):
                                                   self.pg0.sw_if_index,
                                                   labels=[VppMplsLabel(32)])])
         route_10_0_0_1.add_vpp_config()
+        route_1000_1 = VppIpRoute(self, "1000::1", 128,
+                                  [VppRoutePath(self.pg0.remote_ip6,
+                                                self.pg0.sw_if_index,
+                                                labels=[VppMplsLabel(32)])])
+        route_1000_1.add_vpp_config()
 
         #
         # a stream that matches the route for 10.0.0.1
@@ -914,6 +955,33 @@ class TestMPLS(VppTestCase):
         self.verify_capture_fragmented_labelled_ip4(self.pg0, rx, tx,
                                                     [VppMplsLabel(32)])
 
+        # packets with DF bit set generate ICMP
+        for t in tx:
+            t[IP].flags = 'DF'
+        rxs = self.send_and_expect_some(self.pg0, tx, self.pg0)
+
+        for rx in rxs:
+            self.assertEqual(icmptypes[rx[ICMP].type], "dest-unreach")
+            self.assertEqual(icmpcodes[rx[ICMP].type][rx[ICMP].code],
+                             "fragmentation-needed")
+            # the link MTU is 9000, the MPLS over head is 4 bytes
+            self.assertEqual(rx[ICMP].nexthopmtu, 9000 - 4)
+
+        self.assertEqual(self.statistics.get_err_counter(
+            "/err/mpls-frag/can't fragment this packet"),
+                         len(tx))
+        #
+        # a stream that matches the route for 1000::1/128
+        # PG0 is in the default table
+        #
+        tx = self.create_stream_ip6(self.pg0, "1000::1")
+        for i in range(0, 257):
+            self.extend_packet(tx[i], 10000)
+
+        rxs = self.send_and_expect_some(self.pg0, tx, self.pg0)
+        for rx in rxs:
+            self.assertEqual(rx[ICMPv6PacketTooBig].mtu, 9000 - 4)
+
         #
         # cleanup
         #
@@ -1170,6 +1238,13 @@ class TestMPLS(VppTestCase):
                                                   0xffffffff,
                                                   nh_table_id=1)])
         route_35_eos.add_vpp_config()
+        route_356_eos = VppMplsRoute(
+            self, 356, 1,
+            [VppRoutePath("0::0",
+                          0xffffffff,
+                          nh_table_id=1)],
+            eos_proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)
+        route_356_eos.add_vpp_config()
 
         #
         # ping an interface in the non-default table
@@ -1180,6 +1255,10 @@ class TestMPLS(VppTestCase):
             self.pg0, [VppMplsLabel(35)], ping=1, ip_itf=self.pg1)
         rx = self.send_and_expect(self.pg0, tx, self.pg1)
         self.verify_capture_ip4(self.pg1, rx, tx, ping_resp=1)
+        tx = self.create_stream_labelled_ip6(
+            self.pg0, [VppMplsLabel(356)], ping=1, ip_itf=self.pg1)
+        rx = self.send_and_expect(self.pg0, tx, self.pg1)
+        self.verify_capture_ip6(self.pg1, rx, tx, ping_resp=1)
 
         #
         # Double pop