NAT44: fix nat44_ed_not_translate_output_feature (VPP-1329)
[vpp.git] / test / test_classifier.py
index 302430f..1e29aec 100644 (file)
@@ -3,6 +3,7 @@
 import unittest
 import socket
 import binascii
+import sys
 
 from framework import VppTestCase, VppTestRunner
 
@@ -59,12 +60,16 @@ class TestClassifier(VppTestCase):
 
     def tearDown(self):
         """Run standard test teardown and acl related log."""
+        for intf in self.interfaces:
+            intf.unconfig_ip4()
+            intf.admin_down()
+
         super(TestClassifier, self).tearDown()
         if not self.vpp_dead:
             self.logger.info(self.vapi.cli("show classify table verbose"))
             self.logger.info(self.vapi.cli("show ip fib"))
 
-    def config_pbr_fib_entry(self, intf):
+    def config_pbr_fib_entry(self, intf, is_add=1):
         """Configure fib entry to route traffic toward PBR VRF table
 
         :param VppInterface intf: destination interface to be routed for PBR.
@@ -74,7 +79,8 @@ class TestClassifier(VppTestCase):
         self.vapi.ip_add_del_route(intf.local_ip4n,
                                    addr_len,
                                    intf.remote_ip4n,
-                                   table_id=self.pbr_vrfid)
+                                   table_id=self.pbr_vrfid,
+                                   is_add=is_add)
 
     def create_stream(self, src_if, dst_if, packet_sizes):
         """Create input packet stream for defined interfaces.
@@ -114,8 +120,9 @@ class TestClassifier(VppTestCase):
                 payload_info = self.payload_to_info(str(packet[Raw]))
                 packet_index = payload_info.index
                 self.assertEqual(payload_info.dst, dst_sw_if_index)
-                self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
-                                  (dst_if.name, payload_info.src, packet_index))
+                self.logger.debug(
+                    "Got packet on port %s: src=%u (id=%u)" %
+                    (dst_if.name, payload_info.src, packet_index))
                 next_info = self.get_next_packet_info_for_interface2(
                     payload_info.src, dst_sw_if_index,
                     last_info[payload_info.src])
@@ -138,6 +145,25 @@ class TestClassifier(VppTestCase):
                             "Interface %s: Packet expected from interface %s "
                             "didn't arrive" % (dst_if.name, i.name))
 
+    def verify_vrf(self, vrf_id):
+        """
+        Check if the FIB table / VRF ID is configured.
+
+        :param int vrf_id: The FIB table / VRF ID to be verified.
+        :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
+        """
+        ip_fib_dump = self.vapi.ip_fib_dump()
+        vrf_count = 0
+        for ip_fib_details in ip_fib_dump:
+            if ip_fib_details[2] == vrf_id:
+                vrf_count += 1
+        if vrf_count == 0:
+            self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
+            return 0
+        else:
+            self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
+            return 1
+
     @staticmethod
     def build_ip_mask(proto='', src_ip='', dst_ip='',
                       src_port='', dst_port=''):
@@ -254,6 +280,20 @@ class TestClassifier(VppTestCase):
             ip4_table_index=table_index)
         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
 
+    def output_acl_set_interface(self, intf, table_index, is_add=1):
+        """Configure Output ACL interface
+
+        :param VppInterface intf: Interface to apply Output ACL feature.
+        :param int table_index: table index to identify classify table.
+        :param int is_add: option to configure classify session.
+            - enable(1) or disable(0)
+        """
+        r = self.vapi.output_acl_set_interface(
+            is_add,
+            intf.sw_if_index,
+            ip4_table_index=table_index)
+        self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+
     def test_acl_ip(self):
         """ IP ACL test
 
@@ -283,6 +323,36 @@ class TestClassifier(VppTestCase):
         self.pg2.assert_nothing_captured(remark="packets forwarded")
         self.pg3.assert_nothing_captured(remark="packets forwarded")
 
+    def test_acl_ip_out(self):
+        """ Output IP ACL test
+
+        Test scenario for basic IP ACL with source IP
+            - Create IPv4 stream for pg1 -> pg0 interface.
+            - Create ACL with source IP address.
+            - Send and verify received packets on pg0 interface.
+        """
+
+        # Basic ACL testing with source IP
+        pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
+        self.pg1.add_stream(pkts)
+
+        self.create_classify_table('ip', self.build_ip_mask(src_ip='ffffffff'),
+                                   data_offset=0)
+        self.create_classify_session(
+            self.pg1, self.acl_tbl_idx.get('ip'),
+            self.build_ip_match(src_ip=self.pg1.remote_ip4))
+        self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        pkts = self.pg0.get_capture(len(pkts))
+        self.verify_capture(self.pg0, pkts)
+        self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
+        self.pg1.assert_nothing_captured(remark="packets forwarded")
+        self.pg2.assert_nothing_captured(remark="packets forwarded")
+        self.pg3.assert_nothing_captured(remark="packets forwarded")
+
     def test_acl_mac(self):
         """ MAC ACL test
 
@@ -296,8 +366,9 @@ class TestClassifier(VppTestCase):
         pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
         self.pg0.add_stream(pkts)
 
-        self.create_classify_table(
-            'mac', self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
+        self.create_classify_table('mac',
+                                   self.build_mac_mask(src_mac='ffffffffffff'),
+                                   data_offset=-14)
         self.create_classify_session(
             self.pg0, self.acl_tbl_idx.get('mac'),
             self.build_mac_match(src_mac=self.pg0.remote_mac))
@@ -326,12 +397,16 @@ class TestClassifier(VppTestCase):
         pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
         self.pg0.add_stream(pkts)
 
-        self.create_classify_table('pbr', self.build_ip_mask(src_ip='ffffffff'))
+        self.create_classify_table(
+            'pbr', self.build_ip_mask(
+                src_ip='ffffffff'))
         pbr_option = 1
+        # this will create the VRF/table in which we will insert the route
         self.create_classify_session(
             self.pg0, self.acl_tbl_idx.get('pbr'),
             self.build_ip_match(src_ip=self.pg0.remote_ip4),
             pbr_option, self.pbr_vrfid)
+        self.assertTrue(self.verify_vrf(self.pbr_vrfid))
         self.config_pbr_fib_entry(self.pg3)
         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('pbr'))
 
@@ -345,6 +420,15 @@ class TestClassifier(VppTestCase):
         self.pg1.assert_nothing_captured(remark="packets forwarded")
         self.pg2.assert_nothing_captured(remark="packets forwarded")
 
+        # remove the classify session and the route
+        self.config_pbr_fib_entry(self.pg3, is_add=0)
+        self.create_classify_session(
+            self.pg0, self.acl_tbl_idx.get('pbr'),
+            self.build_ip_match(src_ip=self.pg0.remote_ip4),
+            pbr_option, self.pbr_vrfid, is_add=0)
+
+        # and the table should be gone.
+        self.assertFalse(self.verify_vrf(self.pbr_vrfid))
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)