classifier-based ACL: refactor + add output ACL
[vpp.git] / test / test_classifier.py
index a43f7a3..1753fef 100644 (file)
@@ -3,6 +3,7 @@
 import unittest
 import socket
 import binascii
+import sys
 
 from framework import VppTestCase, VppTestRunner
 
@@ -275,6 +276,20 @@ class TestClassifier(VppTestCase):
             ip4_table_index=table_index)
         self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
 
+    def output_acl_set_interface(self, intf, table_index, is_add=1):
+        """Configure Output ACL interface
+
+        :param VppInterface intf: Interface to apply Output ACL feature.
+        :param int table_index: table index to identify classify table.
+        :param int is_add: option to configure classify session.
+            - enable(1) or disable(0)
+        """
+        r = self.vapi.output_acl_set_interface(
+            is_add,
+            intf.sw_if_index,
+            ip4_table_index=table_index)
+        self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+
     def test_acl_ip(self):
         """ IP ACL test
 
@@ -304,6 +319,36 @@ class TestClassifier(VppTestCase):
         self.pg2.assert_nothing_captured(remark="packets forwarded")
         self.pg3.assert_nothing_captured(remark="packets forwarded")
 
+    def test_acl_ip_out(self):
+        """ Output IP ACL test
+
+        Test scenario for basic IP ACL with source IP
+            - Create IPv4 stream for pg1 -> pg0 interface.
+            - Create ACL with source IP address.
+            - Send and verify received packets on pg0 interface.
+        """
+
+        # Basic ACL testing with source IP
+        pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
+        self.pg1.add_stream(pkts)
+
+        self.create_classify_table('ip', self.build_ip_mask(src_ip='ffffffff'),
+                                   data_offset=0)
+        self.create_classify_session(
+            self.pg1, self.acl_tbl_idx.get('ip'),
+            self.build_ip_match(src_ip=self.pg1.remote_ip4))
+        self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        pkts = self.pg0.get_capture(len(pkts))
+        self.verify_capture(self.pg0, pkts)
+        self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
+        self.pg1.assert_nothing_captured(remark="packets forwarded")
+        self.pg2.assert_nothing_captured(remark="packets forwarded")
+        self.pg3.assert_nothing_captured(remark="packets forwarded")
+
     def test_acl_mac(self):
         """ MAC ACL test