tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_dvr.py
index 27522a5..e616408 100644 (file)
@@ -1,27 +1,38 @@
-#!/usr/bin/env python
-import random
-import socket
+#!/usr/bin/env python3
 import unittest
 
-from framework import VppTestCase, VppTestRunner
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
-from vpp_papi_provider import L2_VTR_OP
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
+from vpp_l2 import L2_PORT_TYPE
+from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
+from vpp_acl import AclRule, VppAcl, VppAclInterface
 
 from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q, ARP
+from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.inet import IP, UDP
-from util import ppp
+from socket import AF_INET
+from ipaddress import IPv4Network
+
+NUM_PKTS = 67
 
 
 class TestDVR(VppTestCase):
-    """ IPv4 Load-Balancing """
+    """Distributed Virtual Router"""
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestDVR, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestDVR, cls).tearDownClass()
 
     def setUp(self):
         super(TestDVR, self).setUp()
 
         self.create_pg_interfaces(range(4))
-        self.create_loopback_interfaces(range(1))
+        self.create_loopback_interfaces(1)
 
         for i in self.pg_interfaces:
             i.admin_up()
@@ -35,8 +46,24 @@ class TestDVR(VppTestCase):
 
         super(TestDVR, self).tearDown()
 
+    def assert_same_mac_addr(self, tx, rx):
+        t_eth = tx[Ether]
+        for p in rx:
+            r_eth = p[Ether]
+            self.assertEqual(t_eth.src, r_eth.src)
+            self.assertEqual(t_eth.dst, r_eth.dst)
+
+    def assert_has_vlan_tag(self, tag, rx):
+        for p in rx:
+            r_1q = p[Dot1Q]
+            self.assertEqual(tag, r_1q.vlan)
+
+    def assert_has_no_tag(self, rx):
+        for p in rx:
+            self.assertFalse(p.haslayer(Dot1Q))
+
     def test_dvr(self):
-        """ Distributed Virtual Router """
+        """Distributed Virtual Router"""
 
         #
         # A packet destined to an IP address that is L2 bridged via
@@ -46,18 +73,18 @@ class TestDVR(VppTestCase):
         ip_tag_bridged = "10.10.10.11"
         any_src_addr = "1.1.1.1"
 
-        pkt_no_tag = (Ether(src=self.pg0.remote_mac,
-                            dst=self.loop0.local_mac) /
-                      IP(src=any_src_addr,
-                         dst=ip_non_tag_bridged) /
-                      UDP(sport=1234, dport=1234) /
-                      Raw('\xa5' * 100))
-        pkt_tag = (Ether(src=self.pg0.remote_mac,
-                         dst=self.loop0.local_mac) /
-                   IP(src=any_src_addr,
-                      dst=ip_tag_bridged) /
-                   UDP(sport=1234, dport=1234) /
-                   Raw('\xa5' * 100))
+        pkt_no_tag = (
+            Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
+            / IP(src=any_src_addr, dst=ip_non_tag_bridged)
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+        pkt_tag = (
+            Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
+            / IP(src=any_src_addr, dst=ip_tag_bridged)
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
 
         #
         # Two sub-interfaces so we can test VLAN tag push/pop
@@ -70,109 +97,187 @@ class TestDVR(VppTestCase):
         #
         # Put all the interfaces into a new bridge domain
         #
-        self.vapi.sw_interface_set_l2_bridge(self.pg0.sw_if_index, 1)
-        self.vapi.sw_interface_set_l2_bridge(self.pg1.sw_if_index, 1)
-        self.vapi.sw_interface_set_l2_bridge(sub_if_on_pg2.sw_if_index, 1)
-        self.vapi.sw_interface_set_l2_bridge(sub_if_on_pg3.sw_if_index, 1)
-        self.vapi.sw_interface_set_l2_bridge(self.loop0.sw_if_index, 1, bvi=1)
-
-        self.vapi.sw_interface_set_l2_tag_rewrite(sub_if_on_pg2.sw_if_index,
-                                                  L2_VTR_OP.L2_POP_1,
-                                                  92)
-        self.vapi.sw_interface_set_l2_tag_rewrite(sub_if_on_pg3.sw_if_index,
-                                                  L2_VTR_OP.L2_POP_1,
-                                                  93)
-
-        self.logger.error(self.vapi.ppcli("show bridge-domain 1 detail"))
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg0.sw_if_index, bd_id=1
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg1.sw_if_index, bd_id=1
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.loop0.sw_if_index, bd_id=1, port_type=L2_PORT_TYPE.BVI
+        )
+
+        self.vapi.l2_interface_vlan_tag_rewrite(
+            sw_if_index=sub_if_on_pg2.sw_if_index,
+            vtr_op=L2_VTR_OP.L2_POP_1,
+            push_dot1q=92,
+        )
+        self.vapi.l2_interface_vlan_tag_rewrite(
+            sw_if_index=sub_if_on_pg3.sw_if_index,
+            vtr_op=L2_VTR_OP.L2_POP_1,
+            push_dot1q=93,
+        )
 
         #
         # Add routes to bridge the traffic via a tagged an nontagged interface
         #
         route_no_tag = VppIpRoute(
-            self, ip_non_tag_bridged, 32,
-            [VppRoutePath("0.0.0.0",
-                          self.pg1.sw_if_index,
-                          proto=DpoProto.DPO_PROTO_ETHERNET)])
+            self,
+            ip_non_tag_bridged,
+            32,
+            [
+                VppRoutePath(
+                    "0.0.0.0", self.pg1.sw_if_index, type=FibPathType.FIB_PATH_TYPE_DVR
+                )
+            ],
+        )
         route_no_tag.add_vpp_config()
 
         #
         # Inject the packet that arrives and leaves on a non-tagged interface
         # Since it's 'bridged' expect that the MAC headed is unchanged.
         #
-        self.pg0.add_stream(pkt_no_tag)
-
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-
-        rx = self.pg1.get_capture(1)
-
-        self.assertEqual(rx[0][Ether].dst, pkt_no_tag[Ether].dst)
-        self.assertEqual(rx[0][Ether].src, pkt_no_tag[Ether].src)
+        rx = self.send_and_expect(self.pg0, pkt_no_tag * NUM_PKTS, self.pg1)
+        self.assert_same_mac_addr(pkt_no_tag, rx)
+        self.assert_has_no_tag(rx)
 
         #
         # Add routes to bridge the traffic via a tagged interface
         #
-        route_no_tag = VppIpRoute(
-            self, ip_tag_bridged, 32,
-            [VppRoutePath("0.0.0.0",
-                          sub_if_on_pg3.sw_if_index,
-                          proto=DpoProto.DPO_PROTO_ETHERNET)])
-        route_no_tag.add_vpp_config()
+        route_with_tag = VppIpRoute(
+            self,
+            ip_tag_bridged,
+            32,
+            [
+                VppRoutePath(
+                    "0.0.0.0",
+                    sub_if_on_pg3.sw_if_index,
+                    type=FibPathType.FIB_PATH_TYPE_DVR,
+                )
+            ],
+        )
+        route_with_tag.add_vpp_config()
 
         #
-        # Inject the packet that arrives and leaves on a non-tagged interface
-        # Since it's 'bridged' expect that the MAC headed is unchanged.
+        # Inject the packet that arrives non-tag and leaves on a tagged
+        # interface
         #
-        self.pg0.add_stream(pkt_tag)
+        rx = self.send_and_expect(self.pg0, pkt_tag * NUM_PKTS, self.pg3)
+        self.assert_same_mac_addr(pkt_tag, rx)
+        self.assert_has_vlan_tag(93, rx)
 
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
+        #
+        # Tag to tag
+        #
+        pkt_tag_to_tag = (
+            Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
+            / Dot1Q(vlan=92)
+            / IP(src=any_src_addr, dst=ip_tag_bridged)
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+
+        rx = self.send_and_expect(self.pg2, pkt_tag_to_tag * NUM_PKTS, self.pg3)
+        self.assert_same_mac_addr(pkt_tag_to_tag, rx)
+        self.assert_has_vlan_tag(93, rx)
 
-        rx = self.pg3.get_capture(1)
+        #
+        # Tag to non-Tag
+        #
+        pkt_tag_to_non_tag = (
+            Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
+            / Dot1Q(vlan=92)
+            / IP(src=any_src_addr, dst=ip_non_tag_bridged)
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+
+        rx = self.send_and_expect(self.pg2, pkt_tag_to_non_tag * NUM_PKTS, self.pg1)
+        self.assert_same_mac_addr(pkt_tag_to_tag, rx)
+        self.assert_has_no_tag(rx)
 
-        self.assertEqual(rx[0][Ether].dst, pkt_tag[Ether].dst)
-        self.assertEqual(rx[0][Ether].src, pkt_tag[Ether].src)
-        self.assertEqual(rx[0][Dot1Q].vlan, 93)
+        #
+        # Add an output L3 ACL that will block the traffic
+        #
+        rule_1 = AclRule(
+            is_permit=0,
+            proto=17,
+            ports=1234,
+            src_prefix=IPv4Network((any_src_addr, 32)),
+            dst_prefix=IPv4Network((ip_non_tag_bridged, 32)),
+        )
+        acl = VppAcl(self, rules=[rule_1])
+        acl.add_vpp_config()
 
         #
-        # Tag to tag
+        # Apply the ACL on the output interface
         #
-        pkt_tag_to_tag = (Ether(src=self.pg2.remote_mac,
-                                dst=self.loop0.local_mac) /
-                          Dot1Q(vlan=92) /
-                          IP(src=any_src_addr,
-                             dst=ip_tag_bridged) /
-                          UDP(sport=1234, dport=1234) /
-                          Raw('\xa5' * 100))
+        acl_if1 = VppAclInterface(
+            self, sw_if_index=self.pg1.sw_if_index, n_input=0, acls=[acl]
+        )
+        acl_if1.add_vpp_config()
 
-        self.pg2.add_stream(pkt_tag_to_tag)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        rx = self.pg3.get_capture(1)
+        #
+        # Send packet's that should match the ACL and be dropped
+        #
+        rx = self.send_and_assert_no_replies(self.pg2, pkt_tag_to_non_tag * NUM_PKTS)
 
-        self.assertEqual(rx[0][Ether].dst, pkt_tag_to_tag[Ether].dst)
-        self.assertEqual(rx[0][Ether].src, pkt_tag_to_tag[Ether].src)
-        self.assertEqual(rx[0][Dot1Q].vlan, 93)
+        #
+        # cleanup
+        #
+        acl_if1.remove_vpp_config()
+        acl.remove_vpp_config()
+
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg0.sw_if_index, bd_id=1, enable=0
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.pg1.sw_if_index, bd_id=1, enable=0
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1, enable=0
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1, enable=0
+        )
+        self.vapi.sw_interface_set_l2_bridge(
+            rx_sw_if_index=self.loop0.sw_if_index,
+            bd_id=1,
+            port_type=L2_PORT_TYPE.BVI,
+            enable=0,
+        )
 
         #
-        # Tag to non-Tag
+        # Do a FIB dump to make sure the paths are correctly reported as DVR
         #
-        pkt_tag_to_non_tag = (Ether(src=self.pg2.remote_mac,
-                                    dst=self.loop0.local_mac) /
-                              Dot1Q(vlan=92) /
-                              IP(src=any_src_addr,
-                                 dst=ip_non_tag_bridged) /
-                              UDP(sport=1234, dport=1234) /
-                              Raw('\xa5' * 100))
+        routes = self.vapi.ip_route_dump(0)
+
+        for r in routes:
+            if ip_tag_bridged == str(r.route.prefix.network_address):
+                self.assertEqual(
+                    r.route.paths[0].sw_if_index, sub_if_on_pg3.sw_if_index
+                )
+                self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
+            if ip_non_tag_bridged == str(r.route.prefix.network_address):
+                self.assertEqual(r.route.paths[0].sw_if_index, self.pg1.sw_if_index)
+                self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
 
-        self.pg2.add_stream(pkt_tag_to_non_tag)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        rx = self.pg1.get_capture(1)
+        #
+        # the explicit route delete is require so it happens before
+        # the sbu-interface delete. subinterface delete is required
+        # because that object type does not use the object registry
+        #
+        route_no_tag.remove_vpp_config()
+        route_with_tag.remove_vpp_config()
+        sub_if_on_pg3.remove_vpp_config()
+        sub_if_on_pg2.remove_vpp_config()
 
-        self.assertEqual(rx[0][Ether].dst, pkt_tag_to_tag[Ether].dst)
-        self.assertEqual(rx[0][Ether].src, pkt_tag_to_tag[Ether].src)
-        self.assertFalse(rx[0].haslayer(Dot1Q))
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)