CGN: Session dump, test naming for ports fixed
[vpp.git] / test / test_ip6.py
index b95809b..3ba0923 100644 (file)
@@ -6,8 +6,9 @@ from socket import AF_INET6
 from framework import VppTestCase, VppTestRunner
 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
 from vpp_pg_interface import is_ipv6_misc
-from vpp_neighbor import find_nbr
-from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
+    VppMRoutePath, MRouteItfFlags, MRouteEntryFlags
+from vpp_neighbor import find_nbr, VppNeighbor
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, Dot1Q
@@ -76,7 +77,6 @@ class TestIPv6ND(VppTestCase):
     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
                            filter_out_fn=is_ipv6_misc):
         intf.add_stream(pkts)
-        self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
@@ -85,11 +85,25 @@ class TestIPv6ND(VppTestCase):
         rx = rx[0]
         self.validate_ra(intf, rx, dst_ip)
 
+    def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
+                           tgt_ip=None,
+                           filter_out_fn=is_ipv6_misc):
+        intf.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
+
+        self.assertEqual(len(rx), 1)
+        rx = rx[0]
+        self.validate_na(intf, rx, dst_ip, tgt_ip)
+
     def send_and_assert_no_replies(self, intf, pkts, remark):
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        intf.assert_nothing_captured(remark=remark)
+        for i in self.pg_interfaces:
+            i.get_capture(0)
+            i.assert_nothing_captured(remark=remark)
 
 
 class TestIPv6(TestIPv6ND):
@@ -351,6 +365,83 @@ class TestIPv6(TestIPv6ND):
         self.send_and_assert_no_replies(self.pg0, pkts,
                                         "No response to NS for unknown target")
 
+        #
+        # A neighbor entry that has no associated FIB-entry
+        #
+        self.pg0.generate_remote_hosts(4)
+        nd_entry = VppNeighbor(self,
+                               self.pg0.sw_if_index,
+                               self.pg0.remote_hosts[2].mac,
+                               self.pg0.remote_hosts[2].ip6,
+                               af=AF_INET6,
+                               is_no_fib_entry=1)
+        nd_entry.add_vpp_config()
+
+        #
+        # check we have the neighbor, but no route
+        #
+        self.assertTrue(find_nbr(self,
+                                 self.pg0.sw_if_index,
+                                 self.pg0._remote_hosts[2].ip6,
+                                 inet=AF_INET6))
+        self.assertFalse(find_route(self,
+                                    self.pg0._remote_hosts[2].ip6,
+                                    128,
+                                    inet=AF_INET6))
+
+        #
+        # send an NS from a link local address to the interface's global
+        # address
+        #
+        p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
+             IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
+             ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
+             ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+
+        self.send_and_expect_na(self.pg0, p,
+                                "NS from link-local",
+                                dst_ip=self.pg0._remote_hosts[2].ip6_ll,
+                                tgt_ip=self.pg0.local_ip6)
+
+        #
+        # we should have learned an ND entry for the peer's link-local
+        # but not inserted a route to it in the FIB
+        #
+        self.assertTrue(find_nbr(self,
+                                 self.pg0.sw_if_index,
+                                 self.pg0._remote_hosts[2].ip6_ll,
+                                 inet=AF_INET6))
+        self.assertFalse(find_route(self,
+                                    self.pg0._remote_hosts[2].ip6_ll,
+                                    128,
+                                    inet=AF_INET6))
+
+        #
+        # An NS to the router's own Link-local
+        #
+        p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
+             IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
+             ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
+             ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+
+        self.send_and_expect_na(self.pg0, p,
+                                "NS to/from link-local",
+                                dst_ip=self.pg0._remote_hosts[3].ip6_ll,
+                                tgt_ip=self.pg0.local_ip6_ll)
+
+        #
+        # we should have learned an ND entry for the peer's link-local
+        # but not inserted a route to it in the FIB
+        #
+        self.assertTrue(find_nbr(self,
+                                 self.pg0.sw_if_index,
+                                 self.pg0._remote_hosts[3].ip6_ll,
+                                 inet=AF_INET6))
+        self.assertFalse(find_route(self,
+                                    self.pg0._remote_hosts[3].ip6_ll,
+                                    128,
+                                    inet=AF_INET6))
+
     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
         if not dst_ip:
             dst_ip = intf.remote_ip6
@@ -745,14 +836,10 @@ class IPv6NDProxyTest(TestIPv6ND):
         #
         # try that NS again. this time we expect an NA back
         #
-        self.pg1.add_stream(ns_pg1)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        rx = self.pg1.get_capture(1)
-
-        self.validate_na(self.pg1, rx[0],
-                         dst_ip=self.pg0._remote_hosts[2].ip6,
-                         tgt_ip=self.pg0.local_ip6)
+        self.send_and_expect_na(self.pg1, ns_pg1,
+                                "NS to proxy entry",
+                                dst_ip=self.pg0._remote_hosts[2].ip6,
+                                tgt_ip=self.pg0.local_ip6)
 
         #
         # ... and that we have an entry in the ND cache
@@ -791,14 +878,10 @@ class IPv6NDProxyTest(TestIPv6ND):
                   ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) /
                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
 
-        self.pg0.add_stream(ns_pg0)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        rx = self.pg0.get_capture(1)
-
-        self.validate_na(self.pg0, rx[0],
-                         tgt_ip=self.pg0._remote_hosts[2].ip6,
-                         dst_ip=self.pg0.remote_ip6)
+        self.send_and_expect_na(self.pg0, ns_pg0,
+                                "NS to proxy entry on main",
+                                tgt_ip=self.pg0._remote_hosts[2].ip6,
+                                dst_ip=self.pg0.remote_ip6)
 
         #
         # Setup and resolve proxy for another host on another interface
@@ -812,14 +895,10 @@ class IPv6NDProxyTest(TestIPv6ND):
             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
             self.pg2.sw_if_index)
 
-        self.pg2.add_stream(ns_pg2)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        rx = self.pg2.get_capture(1)
-
-        self.validate_na(self.pg2, rx[0],
-                         dst_ip=self.pg0._remote_hosts[3].ip6,
-                         tgt_ip=self.pg0.local_ip6)
+        self.send_and_expect_na(self.pg2, ns_pg2,
+                                "NS to proxy entry other interface",
+                                dst_ip=self.pg0._remote_hosts[3].ip6,
+                                tgt_ip=self.pg0.local_ip6)
 
         self.assertTrue(find_nbr(self,
                                  self.pg2.sw_if_index,
@@ -957,5 +1036,102 @@ class TestIPNull(VppTestCase):
         self.assertEqual(icmp.code, 1)
 
 
+class TestIPDisabled(VppTestCase):
+    """ IPv6 disabled """
+
+    def setUp(self):
+        super(TestIPDisabled, self).setUp()
+
+        # create 2 pg interfaces
+        self.create_pg_interfaces(range(2))
+
+        # PG0 is IP enalbed
+        self.pg0.admin_up()
+        self.pg0.config_ip6()
+        self.pg0.resolve_ndp()
+
+        # PG 1 is not IP enabled
+        self.pg1.admin_up()
+
+    def tearDown(self):
+        super(TestIPDisabled, self).tearDown()
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+
+    def send_and_assert_no_replies(self, intf, pkts, remark):
+        intf.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        for i in self.pg_interfaces:
+            i.get_capture(0)
+            i.assert_nothing_captured(remark=remark)
+
+    def test_ip_disabled(self):
+        """ IP Disabled """
+
+        #
+        # An (S,G).
+        # one accepting interface, pg0, 2 forwarding interfaces
+        #
+        route_ff_01 = VppIpMRoute(
+            self,
+            "::",
+            "ffef::1", 128,
+            MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+            [VppMRoutePath(self.pg1.sw_if_index,
+                           MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
+             VppMRoutePath(self.pg0.sw_if_index,
+                           MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
+            is_ip6=1)
+        route_ff_01.add_vpp_config()
+
+        pu = (Ether(src=self.pg1.remote_mac,
+                    dst=self.pg1.local_mac) /
+              IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
+              UDP(sport=1234, dport=1234) /
+              Raw('\xa5' * 100))
+        pm = (Ether(src=self.pg1.remote_mac,
+                    dst=self.pg1.local_mac) /
+              IPv6(src="2001::1", dst="ffef::1") /
+              UDP(sport=1234, dport=1234) /
+              Raw('\xa5' * 100))
+
+        #
+        # PG1 does not forward IP traffic
+        #
+        self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
+        self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
+
+        #
+        # IP enable PG1
+        #
+        self.pg1.config_ip6()
+
+        #
+        # Now we get packets through
+        #
+        self.pg1.add_stream(pu)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = self.pg0.get_capture(1)
+
+        self.pg1.add_stream(pm)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = self.pg0.get_capture(1)
+
+        #
+        # Disable PG1
+        #
+        self.pg1.unconfig_ip6()
+
+        #
+        # PG1 does not forward IP traffic
+        #
+        self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
+        self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)