API: Change ip4_address and ip6_address to use type alias.
[vpp.git] / test / test_ip4.py
index 2d98ed5..c130241 100644 (file)
@@ -1,19 +1,21 @@
 #!/usr/bin/env python
+import binascii
 import random
 import socket
 import unittest
 
+from scapy.contrib.mpls import MPLS
+from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
+from scapy.layers.l2 import Ether, Dot1Q, ARP
+from scapy.packet import Raw
+from six import moves
+
 from framework import VppTestCase, VppTestRunner
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
+from util import ppp
 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpMRoute, \
     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
-    VppMplsTable, VppIpTable
-
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q, ARP
-from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
-from util import ppp
-from scapy.contrib.mpls import MPLS
+    VppMplsTable, VppIpTable, VppIpAddress
+from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
 
 
 class TestIPv4(VppTestCase):
@@ -133,11 +135,12 @@ class TestIPv4(VppTestCase):
                     UDP(sport=1234, dport=1234))
 
         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
-                for i in xrange(self.pg_if_packet_sizes[0],
-                                self.pg_if_packet_sizes[1], 10)]
+                for i in moves.range(self.pg_if_packet_sizes[0],
+                                     self.pg_if_packet_sizes[1], 10)]
         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
-                  for i in xrange(self.pg_if_packet_sizes[1] + hdr_ext,
-                                  self.pg_if_packet_sizes[2] + hdr_ext, 50)]
+                  for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
+                                       self.pg_if_packet_sizes[2] + hdr_ext,
+                                       50)]
         pkts.extend(pkts_b)
 
         return pkts
@@ -302,9 +305,8 @@ class TestIPv4FibCrud(VppTestCase):
         :return list: added ips with 32 prefix
         """
         added_ips = []
-        dest_addr = int(socket.inet_pton(socket.AF_INET,
-                                         start_dest_addr).encode('hex'),
-                        16)
+        dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET,
+                                         start_dest_addr)), 16)
         dest_addr_len = 32
         n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
         for _ in range(count):
@@ -318,9 +320,8 @@ class TestIPv4FibCrud(VppTestCase):
     def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
 
         removed_ips = []
-        dest_addr = int(socket.inet_pton(socket.AF_INET,
-                                         start_dest_addr).encode('hex'),
-                        16)
+        dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET,
+                                         start_dest_addr)), 16)
         dest_addr_len = 32
         n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
         for _ in range(count):
@@ -552,7 +553,7 @@ class TestIPNull(VppTestCase):
         super(TestIPNull, self).setUp()
 
         # create 2 pg interfaces
-        self.create_pg_interfaces(range(1))
+        self.create_pg_interfaces(range(2))
 
         for i in self.pg_interfaces:
             i.admin_up()
@@ -624,6 +625,32 @@ class TestIPNull(VppTestCase):
         self.assertEqual(icmp.src, self.pg0.remote_ip4)
         self.assertEqual(icmp.dst, "10.0.0.2")
 
+    def test_ip_drop(self):
+        """ IP Drop Routes """
+
+        p = (Ether(src=self.pg0.remote_mac,
+                   dst=self.pg0.local_mac) /
+             IP(src=self.pg0.remote_ip4, dst="1.1.1.1") /
+             UDP(sport=1234, dport=1234) /
+             Raw('\xa5' * 100))
+
+        r1 = VppIpRoute(self, "1.1.1.0", 24,
+                        [VppRoutePath(self.pg1.remote_ip4,
+                                      self.pg1.sw_if_index)])
+        r1.add_vpp_config()
+
+        rx = self.send_and_expect(self.pg0, p * 65, self.pg1)
+
+        #
+        # insert a more specific as a drop
+        #
+        r2 = VppIpRoute(self, "1.1.1.1", 32, [], is_drop=1)
+        r2.add_vpp_config()
+
+        self.send_and_assert_no_replies(self.pg0, p * 65, "Drop Route")
+        r2.remove_vpp_config()
+        rx = self.send_and_expect(self.pg0, p * 65, self.pg1)
+
 
 class TestIPDisabled(VppTestCase):
     """ IPv4 disabled """
@@ -1069,7 +1096,7 @@ class TestIPPunt(VppTestCase):
     def setUp(self):
         super(TestIPPunt, self).setUp()
 
-        self.create_pg_interfaces(range(2))
+        self.create_pg_interfaces(range(4))
 
         for i in self.pg_interfaces:
             i.admin_up()
@@ -1096,8 +1123,7 @@ class TestIPPunt(VppTestCase):
         #
         # Configure a punt redirect via pg1.
         #
-        nh_addr = socket.inet_pton(socket.AF_INET,
-                                   self.pg1.remote_ip4)
+        nh_addr = VppIpAddress(self.pg1.remote_ip4).encode()
         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
                                    self.pg1.sw_if_index,
                                    nh_addr)
@@ -1121,8 +1147,8 @@ class TestIPPunt(VppTestCase):
         # but not equal to the number sent, since some were policed
         #
         rx = self.pg1._get_capture(1)
-        self.assertTrue(len(rx) > 0)
-        self.assertTrue(len(rx) < len(pkts))
+        self.assertGreater(len(rx), 0)
+        self.assertLess(len(rx), len(pkts))
 
         #
         # remove the poilcer. back to full rx
@@ -1155,6 +1181,40 @@ class TestIPPunt(VppTestCase):
                                    nh_addr,
                                    is_add=0)
 
+    def test_ip_punt_dump(self):
+        """ IP4 punt redirect dump"""
+
+        #
+        # Configure a punt redirects
+        #
+        nh_address = VppIpAddress(self.pg3.remote_ip4).encode()
+        self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
+                                   self.pg3.sw_if_index,
+                                   nh_address)
+        self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
+                                   self.pg3.sw_if_index,
+                                   nh_address)
+        self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
+                                   self.pg3.sw_if_index,
+                                   VppIpAddress('0.0.0.0').encode())
+
+        #
+        # Dump pg0 punt redirects
+        #
+        punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index)
+        for p in punts:
+            self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
+
+        #
+        # Dump punt redirects for all interfaces
+        #
+        punts = self.vapi.ip_punt_redirect_dump(0xffffffff)
+        self.assertEqual(len(punts), 3)
+        for p in punts:
+            self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
+        self.assertNotEqual(punts[1].punt.nh.un.ip4, self.pg3.remote_ip4)
+        self.assertEqual(punts[2].punt.nh.un.ip4, '\x00'*4)
+
 
 class TestIPDeag(VppTestCase):
     """ IPv4 Deaggregate Routes """
@@ -1246,6 +1306,24 @@ class TestIPDeag(VppTestCase):
         route_in_src.add_vpp_config()
         self.send_and_expect(self.pg0, pkts_src, self.pg2)
 
+        #
+        # loop in the lookup DP
+        #
+        route_loop = VppIpRoute(self, "2.2.2.3", 32,
+                                [VppRoutePath("0.0.0.0",
+                                              0xffffffff,
+                                              nh_table_id=0)])
+        route_loop.add_vpp_config()
+
+        p_l = (Ether(src=self.pg0.remote_mac,
+                     dst=self.pg0.local_mac) /
+               IP(src="2.2.2.4", dst="2.2.2.3") /
+               TCP(sport=1234, dport=1234) /
+               Raw('\xa5' * 100))
+
+        self.send_and_assert_no_replies(self.pg0, p_l * 257,
+                                        "IP lookup loop")
+
 
 class TestIPInput(VppTestCase):
     """ IPv4 Input Exceptions """
@@ -1392,5 +1470,173 @@ class TestIPInput(VppTestCase):
         # Reset MTU for subsequent tests
         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [9000, 0, 0, 0])
 
+
+class TestIPDirectedBroadcast(VppTestCase):
+    """ IPv4 Directed Broadcast """
+
+    def setUp(self):
+        super(TestIPDirectedBroadcast, self).setUp()
+
+        self.create_pg_interfaces(range(2))
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+
+    def tearDown(self):
+        super(TestIPDirectedBroadcast, self).tearDown()
+        for i in self.pg_interfaces:
+            i.admin_down()
+
+    def test_ip_input(self):
+        """ IP Directed Broadcast """
+
+        #
+        # set the directed broadcast on pg0 first, then config IP4 addresses
+        # for pg1 directed broadcast is always disabled
+        self.vapi.sw_interface_set_ip_directed_broadcast(
+            self.pg0.sw_if_index, 1)
+
+        p0 = (Ether(src=self.pg1.remote_mac,
+                    dst=self.pg1.local_mac) /
+              IP(src="1.1.1.1",
+                 dst=self.pg0._local_ip4_bcast) /
+              UDP(sport=1234, dport=1234) /
+              Raw('\xa5' * 2000))
+        p1 = (Ether(src=self.pg0.remote_mac,
+                    dst=self.pg0.local_mac) /
+              IP(src="1.1.1.1",
+                 dst=self.pg1._local_ip4_bcast) /
+              UDP(sport=1234, dport=1234) /
+              Raw('\xa5' * 2000))
+
+        self.pg0.config_ip4()
+        self.pg0.resolve_arp()
+        self.pg1.config_ip4()
+        self.pg1.resolve_arp()
+
+        #
+        # test packet is L2 broadcast
+        #
+        rx = self.send_and_expect(self.pg1, p0 * 65, self.pg0)
+        self.assertTrue(rx[0][Ether].dst, "ff:ff:ff:ff:ff:ff")
+
+        self.send_and_assert_no_replies(self.pg0, p1 * 65,
+                                        "directed broadcast disabled")
+
+        #
+        # toggle directed broadcast on pg0
+        #
+        self.vapi.sw_interface_set_ip_directed_broadcast(
+            self.pg0.sw_if_index, 0)
+        self.send_and_assert_no_replies(self.pg1, p0 * 65,
+                                        "directed broadcast disabled")
+
+        self.vapi.sw_interface_set_ip_directed_broadcast(
+            self.pg0.sw_if_index, 1)
+        rx = self.send_and_expect(self.pg1, p0 * 65, self.pg0)
+
+        self.pg0.unconfig_ip4()
+        self.pg1.unconfig_ip4()
+
+
+class TestIPLPM(VppTestCase):
+    """ IPv4 longest Prefix Match """
+
+    def setUp(self):
+        super(TestIPLPM, self).setUp()
+
+        self.create_pg_interfaces(range(4))
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def tearDown(self):
+        super(TestIPLPM, self).tearDown()
+        for i in self.pg_interfaces:
+            i.admin_down()
+            i.unconfig_ip4()
+
+    def test_ip_lpm(self):
+        """ IP longest Prefix Match """
+
+        s_24 = VppIpRoute(self, "10.1.2.0", 24,
+                          [VppRoutePath(self.pg1.remote_ip4,
+                                        self.pg1.sw_if_index)])
+        s_24.add_vpp_config()
+        s_8 = VppIpRoute(self, "10.0.0.0", 8,
+                         [VppRoutePath(self.pg2.remote_ip4,
+                                       self.pg2.sw_if_index)])
+        s_8.add_vpp_config()
+
+        p_8 = (Ether(src=self.pg0.remote_mac,
+                     dst=self.pg0.local_mac) /
+               IP(src="1.1.1.1",
+                  dst="10.1.1.1") /
+               UDP(sport=1234, dport=1234) /
+               Raw('\xa5' * 2000))
+        p_24 = (Ether(src=self.pg0.remote_mac,
+                      dst=self.pg0.local_mac) /
+                IP(src="1.1.1.1",
+                   dst="10.1.2.1") /
+                UDP(sport=1234, dport=1234) /
+                Raw('\xa5' * 2000))
+
+        self.logger.info(self.vapi.cli("sh ip fib mtrie"))
+        rx = self.send_and_expect(self.pg0, p_8 * 65, self.pg2)
+        rx = self.send_and_expect(self.pg0, p_24 * 65, self.pg1)
+
+
+class TestIPv4Frag(VppTestCase):
+    """ IPv4 fragmentation """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestIPv4Frag, cls).setUpClass()
+
+        cls.create_pg_interfaces([0, 1])
+        cls.src_if = cls.pg0
+        cls.dst_if = cls.pg1
+
+        # setup all interfaces
+        for i in cls.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def test_frag_large_packets(self):
+        """ Fragmentation of large packets """
+
+        p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
+             IP(src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) /
+             UDP(sport=1234, dport=5678) / Raw())
+        self.extend_packet(p, 6000, "abcde")
+        saved_payload = p[Raw].load
+
+        # Force fragmentation by setting MTU of output interface
+        # lower than packet size
+        self.vapi.sw_interface_set_mtu(self.dst_if.sw_if_index,
+                                       [5000, 0, 0, 0])
+
+        self.pg_enable_capture()
+        self.src_if.add_stream(p)
+        self.pg_start()
+
+        # Expecting 3 fragments because size of created fragments currently
+        # cannot be larger then VPP buffer size (which is 2048)
+        packets = self.dst_if.get_capture(3)
+
+        # Assume VPP sends the fragments in order
+        payload = ''
+        for p in packets:
+            payload_offset = p.frag * 8
+            if payload_offset > 0:
+                payload_offset -= 8  # UDP header is not in payload
+            self.assert_equal(payload_offset, len(payload))
+            payload += p[Raw].load
+        self.assert_equal(payload, saved_payload, "payload")
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)