Refactor IP input checks for re-use at MPLS disposition
[vpp.git] / test / test_ip6.py
index dbe8746..0a0d56c 100644 (file)
@@ -16,7 +16,8 @@ from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
-    ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
+    ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
+    ICMPv6TimeExceeded
 
 from util import ppp
 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
@@ -1618,5 +1619,80 @@ class TestIP6Punt(VppTestCase):
                                    is_add=0,
                                    is_ip6=1)
 
+
+class TestIP6Input(VppTestCase):
+    """ IPv6 Input Exceptions """
+
+    def setUp(self):
+        super(TestIP6Input, self).setUp()
+
+        self.create_pg_interfaces(range(2))
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip6()
+            i.resolve_ndp()
+
+    def tearDown(self):
+        super(TestIP6Input, self).tearDown()
+        for i in self.pg_interfaces:
+            i.unconfig_ip6()
+            i.admin_down()
+
+    def send_and_expect(self, input, pkts, output):
+        self.vapi.cli("clear trace")
+        input.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = output.get_capture(len(pkts))
+        return rx
+
+    def send_and_assert_no_replies(self, intf, pkts, remark):
+        self.vapi.cli("clear trace")
+        intf.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        for i in self.pg_interfaces:
+            i.get_capture(0)
+            i.assert_nothing_captured(remark=remark)
+
+    def test_ip_input(self):
+        """ IP6 Input Exceptions """
+
+        #
+        # bad version - this is dropped
+        #
+        p_version = (Ether(src=self.pg0.remote_mac,
+                           dst=self.pg0.local_mac) /
+                     IPv6(src=self.pg0.remote_ip6,
+                          dst=self.pg1.remote_ip6,
+                          version=3) /
+                     UDP(sport=1234, dport=1234) /
+                     Raw('\xa5' * 100))
+
+        self.send_and_assert_no_replies(self.pg0, p_version * 65,
+                                        "funky version")
+
+        #
+        # hop limit - IMCP replies
+        #
+        p_version = (Ether(src=self.pg0.remote_mac,
+                           dst=self.pg0.local_mac) /
+                     IPv6(src=self.pg0.remote_ip6,
+                          dst=self.pg1.remote_ip6,
+                          hlim=1) /
+                     UDP(sport=1234, dport=1234) /
+                     Raw('\xa5' * 100))
+
+        rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
+        rx = rx[0]
+        icmp = rx[ICMPv6TimeExceeded]
+        self.assertEqual(icmp.type, 3)
+        # 0: "hop limit exceeded in transit",
+        self.assertEqual(icmp.code, 0)
+
+        self.logger.error(self.vapi.cli("sh error"))
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)