hs-test: rename address allocator
[vpp.git] / test / test_vxlan6.py
index b582d38..0f9c512 100644 (file)
@@ -6,16 +6,18 @@ from framework import VppTestCase, VppTestRunner
 from template_bd import BridgeDomain
 
 from scapy.layers.l2 import Ether
 from template_bd import BridgeDomain
 
 from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, UDP
+from scapy.packet import Raw, bind_layers
+from scapy.layers.inet6 import IP, IPv6, UDP
 from scapy.layers.vxlan import VXLAN
 from scapy.layers.vxlan import VXLAN
-from scapy.utils import atol
+
+import util
 from vpp_ip_route import VppIpRoute, VppRoutePath
 from vpp_vxlan_tunnel import VppVxlanTunnel
 from vpp_ip import INVALID_INDEX
 
 
 class TestVxlan6(BridgeDomain, VppTestCase):
 from vpp_ip_route import VppIpRoute, VppRoutePath
 from vpp_vxlan_tunnel import VppVxlanTunnel
 from vpp_ip import INVALID_INDEX
 
 
 class TestVxlan6(BridgeDomain, VppTestCase):
-    """ VXLAN over IPv6 Test Case """
+    """VXLAN over IPv6 Test Case"""
 
     def __init__(self, *args):
         BridgeDomain.__init__(self)
 
     def __init__(self, *args):
         BridgeDomain.__init__(self)
@@ -26,16 +28,18 @@ class TestVxlan6(BridgeDomain, VppTestCase):
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
-        return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
-                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
-                UDP(sport=self.dport, dport=self.dport, chksum=0) /
-                VXLAN(vni=vni, flags=self.flags) /
-                pkt)
+        return (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+            / UDP(sport=self.dport, dport=self.dport, chksum=0)
+            / VXLAN(vni=vni, flags=self.flags)
+            / pkt
+        )
 
     @classmethod
     def ip_range(cls, s, e):
 
     @classmethod
     def ip_range(cls, s, e):
-        """ range of remote ip's """
-        tmp = cls.pg0.remote_ip6.rsplit(':', 1)[0]
+        """range of remote ip's"""
+        tmp = cls.pg0.remote_ip6.rsplit(":", 1)[0]
         return ("%s:%x" % (tmp, i) for i in range(s, e))
 
     def encap_mcast(self, pkt, src_ip, src_mac, vni):
         return ("%s:%x" % (tmp, i) for i in range(s, e))
 
     def encap_mcast(self, pkt, src_ip, src_mac, vni):
@@ -43,18 +47,20 @@ class TestVxlan6(BridgeDomain, VppTestCase):
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
-        return (Ether(src=src_mac, dst=self.mcast_mac) /
-                IPv6(src=src_ip, dst=self.mcast_ip6) /
-                UDP(sport=self.dport, dport=self.dport, chksum=0) /
-                VXLAN(vni=vni, flags=self.flags) /
-                pkt)
+        return (
+            Ether(src=src_mac, dst=self.mcast_mac)
+            / IPv6(src=src_ip, dst=self.mcast_ip6)
+            / UDP(sport=self.dport, dport=self.dport, chksum=0)
+            / VXLAN(vni=vni, flags=self.flags)
+            / pkt
+        )
 
     def decapsulate(self, pkt):
         """
         Decapsulate the original payload frame by removing VXLAN header
         """
         # check if is set I flag
 
     def decapsulate(self, pkt):
         """
         Decapsulate the original payload frame by removing VXLAN header
         """
         # check if is set I flag
-        self.assertEqual(pkt[VXLAN].flags, int('0x8', 16))
+        self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
         return pkt[VXLAN].payload
 
     # Method for checking VXLAN encapsulation.
         return pkt[VXLAN].payload
 
     # Method for checking VXLAN encapsulation.
@@ -78,24 +84,35 @@ class TestVxlan6(BridgeDomain, VppTestCase):
                 self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
         # Verify UDP destination port is VXLAN 4789, source UDP port could be
         #  arbitrary.
                 self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
         # Verify UDP destination port is VXLAN 4789, source UDP port could be
         #  arbitrary.
-        self.assertEqual(pkt[UDP].dport, type(self).dport)
-        # TODO: checksum check
+        self.assertEqual(pkt[UDP].dport, self.dport)
+        # Verify UDP checksum
+        self.assert_udp_checksum_valid(pkt, ignore_zero_checksum=False)
         # Verify VNI
         self.assertEqual(pkt[VXLAN].vni, vni)
 
     @classmethod
         # Verify VNI
         self.assertEqual(pkt[VXLAN].vni, vni)
 
     @classmethod
-    def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels):
+    def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
         # Create 10 ucast vxlan tunnels under bd
         start = 10
         end = start + n_ucast_tunnels
         for dest_ip6 in cls.ip_range(start, end):
             # add host route so dest ip will not be resolved
         # Create 10 ucast vxlan tunnels under bd
         start = 10
         end = start + n_ucast_tunnels
         for dest_ip6 in cls.ip_range(start, end):
             # add host route so dest ip will not be resolved
-            rip = VppIpRoute(cls, dest_ip6, 128,
-                             [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
-                             register=False)
+            rip = VppIpRoute(
+                cls,
+                dest_ip6,
+                128,
+                [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
+                register=False,
+            )
             rip.add_vpp_config()
             rip.add_vpp_config()
-            r = VppVxlanTunnel(cls, src=cls.pg0.local_ip6,
-                               dst=dest_ip6, vni=vni)
+            r = VppVxlanTunnel(
+                cls,
+                src=cls.pg0.local_ip6,
+                src_port=port,
+                dst_port=port,
+                dst=dest_ip6,
+                vni=vni,
+            )
             r.add_vpp_config()
             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
 
             r.add_vpp_config()
             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
 
@@ -117,7 +134,6 @@ class TestVxlan6(BridgeDomain, VppTestCase):
         super(TestVxlan6, cls).setUpClass()
 
         try:
         super(TestVxlan6, cls).setUpClass()
 
         try:
-            cls.dport = 4789
             cls.flags = 0x8
 
             # Create 2 pg interfaces.
             cls.flags = 0x8
 
             # Create 2 pg interfaces.
@@ -125,16 +141,15 @@ class TestVxlan6(BridgeDomain, VppTestCase):
             for pg in cls.pg_interfaces:
                 pg.admin_up()
 
             for pg in cls.pg_interfaces:
                 pg.admin_up()
 
-            # Configure IPv4 addresses on VPP pg0.
+            # Configure IPv6 addresses on VPP pg0.
             cls.pg0.config_ip6()
 
             # Resolve MAC address for VPP's IP address on pg0.
             cls.pg0.resolve_ndp()
 
             cls.pg0.config_ip6()
 
             # Resolve MAC address for VPP's IP address on pg0.
             cls.pg0.resolve_ndp()
 
-            cls.mcast_ip6 = 'ff0e::1'
-            cls.mcast_ip6n = socket.inet_pton(socket.AF_INET6, cls.mcast_ip6)
-            cls.mcast_mac = "33:33:00:00:00:%02x" % (1)
-
+            # Our Multicast address
+            cls.mcast_ip6 = "ff0e::1"
+            cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip6)
         except Exception:
             super(TestVxlan6, cls).tearDownClass()
             raise
         except Exception:
             super(TestVxlan6, cls).tearDownClass()
             raise
@@ -145,38 +160,64 @@ class TestVxlan6(BridgeDomain, VppTestCase):
 
     def setUp(self):
         super(TestVxlan6, self).setUp()
 
     def setUp(self):
         super(TestVxlan6, self).setUp()
+
+    def createVxLANInterfaces(self, port=4789):
         # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
         #  into BD.
         # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
         #  into BD.
+        self.dport = port
+
         self.single_tunnel_vni = 0x12345
         self.single_tunnel_bd = 1
         self.single_tunnel_vni = 0x12345
         self.single_tunnel_bd = 1
-        r = VppVxlanTunnel(self, src=self.pg0.local_ip6,
-                           dst=self.pg0.remote_ip6,
-                           vni=self.single_tunnel_vni)
+        r = VppVxlanTunnel(
+            self,
+            src=self.pg0.local_ip6,
+            dst=self.pg0.remote_ip6,
+            src_port=self.dport,
+            dst_port=self.dport,
+            vni=self.single_tunnel_vni,
+        )
         r.add_vpp_config()
         r.add_vpp_config()
-        self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
-                                             bd_id=self.single_tunnel_bd)
         self.vapi.sw_interface_set_l2_bridge(
         self.vapi.sw_interface_set_l2_bridge(
-            rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd)
+            rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
+        )
 
         # Setup vni 2 to test multicast flooding
         self.n_ucast_tunnels = 10
         self.mcast_flood_bd = 2
 
         # Setup vni 2 to test multicast flooding
         self.n_ucast_tunnels = 10
         self.mcast_flood_bd = 2
-        self.create_vxlan_flood_test_bd(self.mcast_flood_bd,
-                                        self.n_ucast_tunnels)
-        r = VppVxlanTunnel(self, src=self.pg0.local_ip6, dst=self.mcast_ip6,
-                           mcast_sw_if_index=1, vni=self.mcast_flood_bd)
+        self.create_vxlan_flood_test_bd(
+            self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
+        )
+        r = VppVxlanTunnel(
+            self,
+            src=self.pg0.local_ip6,
+            dst=self.mcast_ip6,
+            src_port=self.dport,
+            dst_port=self.dport,
+            mcast_sw_if_index=1,
+            vni=self.mcast_flood_bd,
+        )
         r.add_vpp_config()
         r.add_vpp_config()
-        self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
-                                             bd_id=self.mcast_flood_bd)
         self.vapi.sw_interface_set_l2_bridge(
         self.vapi.sw_interface_set_l2_bridge(
-            rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd)
+            rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
+        )
 
         # Setup vni 3 to test unicast flooding
         self.ucast_flood_bd = 3
 
         # Setup vni 3 to test unicast flooding
         self.ucast_flood_bd = 3
-        self.create_vxlan_flood_test_bd(self.ucast_flood_bd,
-                                        self.n_ucast_tunnels)
+        self.create_vxlan_flood_test_bd(
+            self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
+        )
         self.vapi.sw_interface_set_l2_bridge(
         self.vapi.sw_interface_set_l2_bridge(
-            rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd)
+            rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
+        )
+
+        # Set scapy listen custom port for VxLAN
+        bind_layers(UDP, VXLAN, dport=self.dport)
 
     # Method to define VPP actions before tear down of the test case.
     #  Overrides tearDown method in VppTestCase class.
 
     # Method to define VPP actions before tear down of the test case.
     #  Overrides tearDown method in VppTestCase class.
@@ -190,6 +231,118 @@ class TestVxlan6(BridgeDomain, VppTestCase):
         self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
         self.logger.info(self.vapi.cli("show vxlan tunnel"))
 
         self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
         self.logger.info(self.vapi.cli("show vxlan tunnel"))
 
+    def encap_fragmented_packet(self):
+        frame = (
+            Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
+            / IP(src="4.3.2.1", dst="1.2.3.4")
+            / UDP(sport=20000, dport=10000)
+            / Raw(b"\xa5" * 1000)
+        )
+
+        frags = util.fragment_rfc791(frame, 400)
+
+        self.pg1.add_stream(frags)
+
+        self.pg0.enable_capture()
+
+        self.pg_start()
+
+        out = self.pg0.get_capture(3)
+
+        payload = []
+        for pkt in out:
+            payload.append(self.decapsulate(pkt))
+            self.check_encapsulation(pkt, self.single_tunnel_vni)
+
+        reassembled = util.reassemble4(payload)
+
+        self.assertEqual(Ether(raw(frame))[IP], reassembled[IP])
+
+    """
+    Tests with default port (4789)
+    """
+
+    def test_decap(self):
+        """Decapsulation test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_decap()
+
+    def test_encap(self):
+        """Encapsulation test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_encap()
+
+    def test_encap_fragmented_packet(self):
+        """Encapsulation test send fragments from pg1
+        Verify receipt of encapsulated frames on pg0
+        """
+        self.createVxLANInterfaces()
+        self.encap_fragmented_packet()
+
+    def test_ucast_flood(self):
+        """Unicast flood test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_ucast_flood()
+
+    def test_mcast_flood(self):
+        """Multicast flood test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_mcast_flood()
+
+    def test_mcast_rcv(self):
+        """Multicast receive test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_mcast_rcv()
+
+    """
+    Tests with custom port
+    """
+
+    def test_decap_custom_port(self):
+        """Decapsulation test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_decap()
+
+    def test_encap_custom_port(self):
+        """Encapsulation test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_encap()
+
+    def test_ucast_flood_custom_port(self):
+        """Unicast flood test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_ucast_flood()
+
+    def test_mcast_flood_custom_port(self):
+        """Multicast flood test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_mcast_flood()
+
+    def test_mcast_rcv_custom_port(self):
+        """Multicast receive test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_mcast_rcv()
+
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)
     unittest.main(testRunner=VppTestRunner)