Tests to target holes in adjacency and DPO test coverage
[vpp.git] / test / test_ip4.py
index df93533..7f6e92f 100644 (file)
@@ -5,10 +5,11 @@ import unittest
 
 from framework import VppTestCase, VppTestRunner
 from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
+from vpp_ip_route import VppIpRoute, VppRoutePath
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, Dot1Q
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes
 from util import ppp
 
 
@@ -148,8 +149,9 @@ class TestIPv4(VppTestCase):
                 payload_info = self.payload_to_info(str(packet[Raw]))
                 packet_index = payload_info.index
                 self.assertEqual(payload_info.dst, dst_sw_if_index)
-                self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
-                                  (dst_if.name, payload_info.src, packet_index))
+                self.logger.debug(
+                    "Got packet on port %s: src=%u (id=%u)" %
+                    (dst_if.name, payload_info.src, packet_index))
                 next_info = self.get_next_packet_info_for_interface2(
                     payload_info.src, dst_sw_if_index,
                     last_info[payload_info.src])
@@ -209,7 +211,7 @@ class TestIPv4FibCrud(VppTestCase):
         - add new 1k,
         - del 1.5k
 
-    ..note:: Python API is to slow to add many routes, needs C code replacement.
+    ..note:: Python API is too slow to add many routes, needs replacement.
     """
 
     def config_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
@@ -221,8 +223,9 @@ class TestIPv4FibCrud(VppTestCase):
         :return list: added ips with 32 prefix
         """
         added_ips = []
-        dest_addr = int(
-            socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16)
+        dest_addr = int(socket.inet_pton(socket.AF_INET,
+                                         start_dest_addr).encode('hex'),
+                        16)
         dest_addr_len = 32
         n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
         for _ in range(count):
@@ -236,8 +239,9 @@ class TestIPv4FibCrud(VppTestCase):
     def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
 
         removed_ips = []
-        dest_addr = int(
-            socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16)
+        dest_addr = int(socket.inet_pton(socket.AF_INET,
+                                         start_dest_addr).encode('hex'),
+                        16)
         dest_addr_len = 32
         n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
         for _ in range(count):
@@ -462,5 +466,85 @@ class TestIPv4FibCrud(VppTestCase):
         self.verify_not_in_route_dump(fib_dump, self.deleted_routes)
 
 
+class TestIPNull(VppTestCase):
+    """ IPv4 routes via NULL """
+
+    def setUp(self):
+        super(TestIPNull, self).setUp()
+
+        # create 2 pg interfaces
+        self.create_pg_interfaces(range(1))
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def tearDown(self):
+        super(TestIPNull, self).tearDown()
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+
+    def test_ip_null(self):
+        """ IP NULL route """
+
+        #
+        # A route via IP NULL that will reply with ICMP unreachables
+        #
+        ip_unreach = VppIpRoute(self, "10.0.0.1", 32, [], is_unreach=1)
+        ip_unreach.add_vpp_config()
+
+        p_unreach = (Ether(src=self.pg0.remote_mac,
+                           dst=self.pg0.local_mac) /
+                     IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
+                     UDP(sport=1234, dport=1234) /
+                     Raw('\xa5' * 100))
+
+        self.pg0.add_stream(p_unreach)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture(1)
+        rx = rx[0]
+        icmp = rx[ICMP]
+
+        self.assertEqual(icmptypes[icmp.type], "dest-unreach")
+        self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-unreachable")
+        self.assertEqual(icmp.src, self.pg0.remote_ip4)
+        self.assertEqual(icmp.dst, "10.0.0.1")
+
+        #
+        # ICMP replies are rate limited. so sit and spin.
+        #
+        self.sleep(1)
+
+        #
+        # A route via IP NULL that will reply with ICMP prohibited
+        #
+        ip_prohibit = VppIpRoute(self, "10.0.0.2", 32, [], is_prohibit=1)
+        ip_prohibit.add_vpp_config()
+
+        p_prohibit = (Ether(src=self.pg0.remote_mac,
+                            dst=self.pg0.local_mac) /
+                      IP(src=self.pg0.remote_ip4, dst="10.0.0.2") /
+                      UDP(sport=1234, dport=1234) /
+                      Raw('\xa5' * 100))
+
+        self.pg0.add_stream(p_prohibit)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture(1)
+
+        rx = rx[0]
+        icmp = rx[ICMP]
+
+        self.assertEqual(icmptypes[icmp.type], "dest-unreach")
+        self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-prohibited")
+        self.assertEqual(icmp.src, self.pg0.remote_ip4)
+        self.assertEqual(icmp.dst, "10.0.0.2")
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)