stats: expose symlink to stats client
[vpp.git] / test / test_vxlan6.py
index 6f8fee7..0f9c512 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import socket
 import unittest
 
 import socket
 import unittest
@@ -6,13 +6,18 @@ from framework import VppTestCase, VppTestRunner
 from template_bd import BridgeDomain
 
 from scapy.layers.l2 import Ether
 from template_bd import BridgeDomain
 
 from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, UDP
+from scapy.packet import Raw, bind_layers
+from scapy.layers.inet6 import IP, IPv6, UDP
 from scapy.layers.vxlan import VXLAN
 from scapy.layers.vxlan import VXLAN
-from scapy.utils import atol
+
+import util
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_vxlan_tunnel import VppVxlanTunnel
+from vpp_ip import INVALID_INDEX
 
 
 class TestVxlan6(BridgeDomain, VppTestCase):
 
 
 class TestVxlan6(BridgeDomain, VppTestCase):
-    """ VXLAN over IPv6 Test Case """
+    """VXLAN over IPv6 Test Case"""
 
     def __init__(self, *args):
         BridgeDomain.__init__(self)
 
     def __init__(self, *args):
         BridgeDomain.__init__(self)
@@ -23,16 +28,18 @@ class TestVxlan6(BridgeDomain, VppTestCase):
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
-        return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
-                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
-                UDP(sport=self.dport, dport=self.dport, chksum=0) /
-                VXLAN(vni=vni, flags=self.flags) /
-                pkt)
+        return (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+            / UDP(sport=self.dport, dport=self.dport, chksum=0)
+            / VXLAN(vni=vni, flags=self.flags)
+            / pkt
+        )
 
     @classmethod
     def ip_range(cls, s, e):
 
     @classmethod
     def ip_range(cls, s, e):
-        """ range of remote ip's """
-        tmp = cls.pg0.remote_ip6.rsplit(':', 1)[0]
+        """range of remote ip's"""
+        tmp = cls.pg0.remote_ip6.rsplit(":", 1)[0]
         return ("%s:%x" % (tmp, i) for i in range(s, e))
 
     def encap_mcast(self, pkt, src_ip, src_mac, vni):
         return ("%s:%x" % (tmp, i) for i in range(s, e))
 
     def encap_mcast(self, pkt, src_ip, src_mac, vni):
@@ -40,18 +47,20 @@ class TestVxlan6(BridgeDomain, VppTestCase):
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
         Encapsulate the original payload frame by adding VXLAN header with its
         UDP, IP and Ethernet fields
         """
-        return (Ether(src=src_mac, dst=self.mcast_mac) /
-                IPv6(src=src_ip, dst=self.mcast_ip6) /
-                UDP(sport=self.dport, dport=self.dport, chksum=0) /
-                VXLAN(vni=vni, flags=self.flags) /
-                pkt)
+        return (
+            Ether(src=src_mac, dst=self.mcast_mac)
+            / IPv6(src=src_ip, dst=self.mcast_ip6)
+            / UDP(sport=self.dport, dport=self.dport, chksum=0)
+            / VXLAN(vni=vni, flags=self.flags)
+            / pkt
+        )
 
     def decapsulate(self, pkt):
         """
         Decapsulate the original payload frame by removing VXLAN header
         """
         # check if is set I flag
 
     def decapsulate(self, pkt):
         """
         Decapsulate the original payload frame by removing VXLAN header
         """
         # check if is set I flag
-        self.assertEqual(pkt[VXLAN].flags, int('0x8', 16))
+        self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
         return pkt[VXLAN].payload
 
     # Method for checking VXLAN encapsulation.
         return pkt[VXLAN].payload
 
     # Method for checking VXLAN encapsulation.
@@ -75,28 +84,37 @@ class TestVxlan6(BridgeDomain, VppTestCase):
                 self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
         # Verify UDP destination port is VXLAN 4789, source UDP port could be
         #  arbitrary.
                 self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
         # Verify UDP destination port is VXLAN 4789, source UDP port could be
         #  arbitrary.
-        self.assertEqual(pkt[UDP].dport, type(self).dport)
-        # TODO: checksum check
+        self.assertEqual(pkt[UDP].dport, self.dport)
+        # Verify UDP checksum
+        self.assert_udp_checksum_valid(pkt, ignore_zero_checksum=False)
         # Verify VNI
         self.assertEqual(pkt[VXLAN].vni, vni)
 
     @classmethod
         # Verify VNI
         self.assertEqual(pkt[VXLAN].vni, vni)
 
     @classmethod
-    def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels):
+    def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
         # Create 10 ucast vxlan tunnels under bd
         start = 10
         end = start + n_ucast_tunnels
         # Create 10 ucast vxlan tunnels under bd
         start = 10
         end = start + n_ucast_tunnels
-        next_hop = cls.pg0.remote_ip6n
         for dest_ip6 in cls.ip_range(start, end):
         for dest_ip6 in cls.ip_range(start, end):
-            dest_ip6n = socket.inet_pton(socket.AF_INET6, dest_ip6)
             # add host route so dest ip will not be resolved
             # add host route so dest ip will not be resolved
-            cls.vapi.ip_add_del_route(dst_address=dest_ip6n,
-                                      dst_address_length=128,
-                                      next_hop_address=next_hop, is_ipv6=1)
-            r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip6n,
-                                              dst_address=dest_ip6n, is_ipv6=1,
-                                              vni=vni)
-            cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
-                                                bd_id=vni)
+            rip = VppIpRoute(
+                cls,
+                dest_ip6,
+                128,
+                [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
+                register=False,
+            )
+            rip.add_vpp_config()
+            r = VppVxlanTunnel(
+                cls,
+                src=cls.pg0.local_ip6,
+                src_port=port,
+                dst_port=port,
+                dst=dest_ip6,
+                vni=vni,
+            )
+            r.add_vpp_config()
+            cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
 
     @classmethod
     def add_mcast_tunnels_load(cls):
 
     @classmethod
     def add_mcast_tunnels_load(cls):
@@ -116,7 +134,6 @@ class TestVxlan6(BridgeDomain, VppTestCase):
         super(TestVxlan6, cls).setUpClass()
 
         try:
         super(TestVxlan6, cls).setUpClass()
 
         try:
-            cls.dport = 4789
             cls.flags = 0x8
 
             # Create 2 pg interfaces.
             cls.flags = 0x8
 
             # Create 2 pg interfaces.
@@ -124,48 +141,15 @@ class TestVxlan6(BridgeDomain, VppTestCase):
             for pg in cls.pg_interfaces:
                 pg.admin_up()
 
             for pg in cls.pg_interfaces:
                 pg.admin_up()
 
-            # Configure IPv4 addresses on VPP pg0.
+            # Configure IPv6 addresses on VPP pg0.
             cls.pg0.config_ip6()
 
             # Resolve MAC address for VPP's IP address on pg0.
             cls.pg0.resolve_ndp()
 
             cls.pg0.config_ip6()
 
             # Resolve MAC address for VPP's IP address on pg0.
             cls.pg0.resolve_ndp()
 
-            cls.mcast_ip6 = 'ff0e::1'
-            cls.mcast_ip6n = socket.inet_pton(socket.AF_INET6, cls.mcast_ip6)
-            cls.mcast_mac = "33:33:00:00:00:%02x" % (1)
-
-            # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
-            #  into BD.
-            cls.single_tunnel_bd = 1
-            r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip6n,
-                                              dst_address=cls.pg0.remote_ip6n,
-                                              is_ipv6=1,
-                                              vni=cls.single_tunnel_bd)
-            cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
-                                                bd_id=cls.single_tunnel_bd)
-            cls.vapi.sw_interface_set_l2_bridge(
-                rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd)
-
-            # Setup vni 2 to test multicast flooding
-            cls.n_ucast_tunnels = 10
-            cls.mcast_flood_bd = 2
-            cls.create_vxlan_flood_test_bd(cls.mcast_flood_bd,
-                                           cls.n_ucast_tunnels)
-            r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip6n,
-                                              dst_address=cls.mcast_ip6n,
-                                              mcast_sw_if_index=1, is_ipv6=1,
-                                              vni=cls.mcast_flood_bd)
-            cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
-                                                bd_id=cls.mcast_flood_bd)
-            cls.vapi.sw_interface_set_l2_bridge(
-                rx_sw_if_index=cls.pg2.sw_if_index, bd_id=cls.mcast_flood_bd)
-
-            # Setup vni 3 to test unicast flooding
-            cls.ucast_flood_bd = 3
-            cls.create_vxlan_flood_test_bd(cls.ucast_flood_bd,
-                                           cls.n_ucast_tunnels)
-            cls.vapi.sw_interface_set_l2_bridge(
-                rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd)
+            # Our Multicast address
+            cls.mcast_ip6 = "ff0e::1"
+            cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip6)
         except Exception:
             super(TestVxlan6, cls).tearDownClass()
             raise
         except Exception:
             super(TestVxlan6, cls).tearDownClass()
             raise
@@ -174,17 +158,191 @@ class TestVxlan6(BridgeDomain, VppTestCase):
     def tearDownClass(cls):
         super(TestVxlan6, cls).tearDownClass()
 
     def tearDownClass(cls):
         super(TestVxlan6, cls).tearDownClass()
 
+    def setUp(self):
+        super(TestVxlan6, self).setUp()
+
+    def createVxLANInterfaces(self, port=4789):
+        # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
+        #  into BD.
+        self.dport = port
+
+        self.single_tunnel_vni = 0x12345
+        self.single_tunnel_bd = 1
+        r = VppVxlanTunnel(
+            self,
+            src=self.pg0.local_ip6,
+            dst=self.pg0.remote_ip6,
+            src_port=self.dport,
+            dst_port=self.dport,
+            vni=self.single_tunnel_vni,
+        )
+        r.add_vpp_config()
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
+        )
+
+        # Setup vni 2 to test multicast flooding
+        self.n_ucast_tunnels = 10
+        self.mcast_flood_bd = 2
+        self.create_vxlan_flood_test_bd(
+            self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
+        )
+        r = VppVxlanTunnel(
+            self,
+            src=self.pg0.local_ip6,
+            dst=self.mcast_ip6,
+            src_port=self.dport,
+            dst_port=self.dport,
+            mcast_sw_if_index=1,
+            vni=self.mcast_flood_bd,
+        )
+        r.add_vpp_config()
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
+        )
+
+        # Setup vni 3 to test unicast flooding
+        self.ucast_flood_bd = 3
+        self.create_vxlan_flood_test_bd(
+            self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
+        )
+
+        # Set scapy listen custom port for VxLAN
+        bind_layers(UDP, VXLAN, dport=self.dport)
+
     # Method to define VPP actions before tear down of the test case.
     #  Overrides tearDown method in VppTestCase class.
     #  @param self The object pointer.
     def tearDown(self):
         super(TestVxlan6, self).tearDown()
     # Method to define VPP actions before tear down of the test case.
     #  Overrides tearDown method in VppTestCase class.
     #  @param self The object pointer.
     def tearDown(self):
         super(TestVxlan6, self).tearDown()
-        if not self.vpp_dead:
-            self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
-            self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
-            self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
-            self.logger.info(self.vapi.cli("show vxlan tunnel"))
+
+    def show_commands_at_teardown(self):
+        self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
+        self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
+        self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
+        self.logger.info(self.vapi.cli("show vxlan tunnel"))
+
+    def encap_fragmented_packet(self):
+        frame = (
+            Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
+            / IP(src="4.3.2.1", dst="1.2.3.4")
+            / UDP(sport=20000, dport=10000)
+            / Raw(b"\xa5" * 1000)
+        )
+
+        frags = util.fragment_rfc791(frame, 400)
+
+        self.pg1.add_stream(frags)
+
+        self.pg0.enable_capture()
+
+        self.pg_start()
+
+        out = self.pg0.get_capture(3)
+
+        payload = []
+        for pkt in out:
+            payload.append(self.decapsulate(pkt))
+            self.check_encapsulation(pkt, self.single_tunnel_vni)
+
+        reassembled = util.reassemble4(payload)
+
+        self.assertEqual(Ether(raw(frame))[IP], reassembled[IP])
+
+    """
+    Tests with default port (4789)
+    """
+
+    def test_decap(self):
+        """Decapsulation test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_decap()
+
+    def test_encap(self):
+        """Encapsulation test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_encap()
+
+    def test_encap_fragmented_packet(self):
+        """Encapsulation test send fragments from pg1
+        Verify receipt of encapsulated frames on pg0
+        """
+        self.createVxLANInterfaces()
+        self.encap_fragmented_packet()
+
+    def test_ucast_flood(self):
+        """Unicast flood test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_ucast_flood()
+
+    def test_mcast_flood(self):
+        """Multicast flood test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_mcast_flood()
+
+    def test_mcast_rcv(self):
+        """Multicast receive test
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces()
+        super(TestVxlan6, self).test_mcast_rcv()
+
+    """
+    Tests with custom port
+    """
+
+    def test_decap_custom_port(self):
+        """Decapsulation test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_decap()
+
+    def test_encap_custom_port(self):
+        """Encapsulation test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_encap()
+
+    def test_ucast_flood_custom_port(self):
+        """Unicast flood test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_ucast_flood()
+
+    def test_mcast_flood_custom_port(self):
+        """Multicast flood test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_mcast_flood()
+
+    def test_mcast_rcv_custom_port(self):
+        """Multicast receive test custom port
+        from BridgeDoman
+        """
+        self.createVxLANInterfaces(1111)
+        super(TestVxlan6, self).test_mcast_rcv()
 
 
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)
     unittest.main(testRunner=VppTestRunner)