punt and drop features:
[vpp.git] / test / test_ip4.py
index ddfd218..5bd50ce 100644 (file)
@@ -6,11 +6,12 @@ import unittest
 from framework import VppTestCase, VppTestRunner
 from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpMRoute, \
-    VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind
+    VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
+    VppMplsTable
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, Dot1Q, ARP
-from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes
+from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
 from util import ppp
 from scapy.contrib.mpls import MPLS
 
@@ -774,6 +775,8 @@ class TestIPLoadBalance(VppTestCase):
         super(TestIPLoadBalance, self).setUp()
 
         self.create_pg_interfaces(range(5))
+        mpls_tbl = VppMplsTable(self, 0)
+        mpls_tbl.add_vpp_config()
 
         for i in self.pg_interfaces:
             i.admin_up()
@@ -782,11 +785,11 @@ class TestIPLoadBalance(VppTestCase):
             i.enable_mpls()
 
     def tearDown(self):
-        super(TestIPLoadBalance, self).tearDown()
         for i in self.pg_interfaces:
             i.disable_mpls()
             i.unconfig_ip4()
             i.admin_down()
+        super(TestIPLoadBalance, self).tearDown()
 
     def send_and_expect_load_balancing(self, input, pkts, outputs):
         input.add_stream(pkts)
@@ -796,6 +799,12 @@ class TestIPLoadBalance(VppTestCase):
             rx = oo._get_capture(1)
             self.assertNotEqual(0, len(rx))
 
+    def send_and_expect_one_itf(self, input, pkts, itf):
+        input.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = itf.get_capture(len(pkts))
+
     def test_ip_load_balance(self):
         """ IP Load-Balancing """
 
@@ -874,11 +883,7 @@ class TestIPLoadBalance(VppTestCase):
         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
                                             [self.pg1, self.pg2])
 
-        self.pg0.add_stream(port_ip_pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-
-        rx = self.pg2.get_capture(len(port_ip_pkts))
+        self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
 
         #
         # change the flow hash config back to defaults
@@ -928,5 +933,191 @@ class TestIPLoadBalance(VppTestCase):
                                             [self.pg1, self.pg2,
                                              self.pg3, self.pg4])
 
+        #
+        # Recursive prefixes
+        #  - testing that 2 stages of load-balancing, no choices
+        #
+        port_pkts = []
+
+        for ii in range(257):
+            port_pkts.append((Ether(src=self.pg0.remote_mac,
+                                    dst=self.pg0.local_mac) /
+                              IP(dst="1.1.1.2", src="20.0.0.2") /
+                              UDP(sport=1234, dport=1234 + ii) /
+                              Raw('\xa5' * 100)))
+
+        route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
+                                    [VppRoutePath(self.pg3.remote_ip4,
+                                                  self.pg3.sw_if_index)])
+        route_10_0_0_3.add_vpp_config()
+
+        route_1_1_1_2 = VppIpRoute(self, "1.1.1.2", 32,
+                                   [VppRoutePath("10.0.0.3", 0xffffffff)])
+        route_1_1_1_2.add_vpp_config()
+
+        #
+        # inject the packet on pg0 - expect load-balancing across all 4 paths
+        #
+        self.vapi.cli("clear trace")
+        self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
+
+
+class TestIPVlan0(VppTestCase):
+    """ IPv4 VLAN-0 """
+
+    def setUp(self):
+        super(TestIPVlan0, self).setUp()
+
+        self.create_pg_interfaces(range(2))
+        mpls_tbl = VppMplsTable(self, 0)
+        mpls_tbl.add_vpp_config()
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+            i.enable_mpls()
+
+    def tearDown(self):
+        for i in self.pg_interfaces:
+            i.disable_mpls()
+            i.unconfig_ip4()
+            i.admin_down()
+        super(TestIPVlan0, self).tearDown()
+
+    def send_and_expect(self, input, pkts, output):
+        input.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = output.get_capture(len(pkts))
+
+    def test_ip_vlan_0(self):
+        """ IP VLAN-0 """
+
+        pkts = (Ether(src=self.pg0.remote_mac,
+                      dst=self.pg0.local_mac) /
+                Dot1Q(vlan=0) /
+                IP(dst=self.pg1.remote_ip4,
+                   src=self.pg0.remote_ip4) /
+                UDP(sport=1234, dport=1234) /
+                Raw('\xa5' * 100)) * 65
+
+        #
+        # Expect that packets sent on VLAN-0 are forwarded on the
+        # main interface.
+        #
+        self.send_and_expect(self.pg0, pkts, self.pg1)
+
+
+class TestIPPunt(VppTestCase):
+    """ IPv4 Punt Police/Redirect """
+
+    def setUp(self):
+        super(TestIPPunt, self).setUp()
+
+        self.create_pg_interfaces(range(2))
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def tearDown(self):
+        super(TestIPPunt, self).tearDown()
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+
+    def send_and_expect(self, input, pkts, output):
+        self.vapi.cli("clear trace")
+        input.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = output.get_capture(len(pkts))
+        return rx
+
+    def send_and_assert_no_replies(self, intf, pkts, remark):
+        self.vapi.cli("clear trace")
+        intf.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        for i in self.pg_interfaces:
+            i.get_capture(0)
+            i.assert_nothing_captured(remark=remark)
+
+    def test_ip_punt(self):
+        """ IP punt police and redirect """
+
+        p = (Ether(src=self.pg0.remote_mac,
+                   dst=self.pg0.local_mac) /
+             IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
+             TCP(sport=1234, dport=1234) /
+             Raw('\xa5' * 100))
+
+        pkts = p * 1025
+
+        #
+        # Configure a punt redirect via pg1.
+        #
+        nh_addr = socket.inet_pton(socket.AF_INET,
+                                   self.pg1.remote_ip4)
+        self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
+                                   self.pg1.sw_if_index,
+                                   nh_addr)
+
+        self.send_and_expect(self.pg0, pkts, self.pg1)
+
+        #
+        # add a policer
+        #
+        policer = self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+                                            rate_type=1)
+        self.vapi.ip_punt_police(policer.policer_index)
+
+        self.vapi.cli("clear trace")
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        #
+        # the number of packet recieved should be greater than 0,
+        # but not equal to the number sent, since some were policed
+        #
+        rx = self.pg1._get_capture(1)
+        self.assertTrue(len(rx) > 0)
+        self.assertTrue(len(rx) < len(pkts))
+
+        #
+        # remove the poilcer. back to full rx
+        #
+        self.vapi.ip_punt_police(policer.policer_index, is_add=0)
+        self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+                                  rate_type=1, is_add=0)
+        self.send_and_expect(self.pg0, pkts, self.pg1)
+
+        #
+        # remove the redirect. expect full drop.
+        #
+        self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
+                                   self.pg1.sw_if_index,
+                                   nh_addr,
+                                   is_add=0)
+        self.send_and_assert_no_replies(self.pg0, pkts,
+                                        "IP no punt config")
+
+        #
+        # Add a redirect that is not input port selective
+        #
+        self.vapi.ip_punt_redirect(0xffffffff,
+                                   self.pg1.sw_if_index,
+                                   nh_addr)
+        self.send_and_expect(self.pg0, pkts, self.pg1)
+
+        self.vapi.ip_punt_redirect(0xffffffff,
+                                   self.pg1.sw_if_index,
+                                   nh_addr,
+                                   is_add=0)
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)