ACL-plugin add "replace" semantics for adding a new MacIP acl
[vpp.git] / test / test_acl_plugin_macip.py
index de57f6b..3f41313 100644 (file)
@@ -130,13 +130,14 @@ class TestMACIP(VppTestCase):
         acls = self.vapi.macip_acl_dump()
         if self.DEBUG:
             for acl in acls:
+                print "ACL #"+str(acl.acl_index)
                 for r in acl.r:
                     rule = "ACTION"
                     if r.is_permit == 1:
                         rule = "PERMIT"
                     elif r.is_permit == 0:
                         rule = "DENY  "
-                    print "IP6" if r.is_ipv6 else "IP4", \
+                    print "    IP6" if r.is_ipv6 else "    IP4", \
                           rule, \
                           r.src_mac.encode('hex'), \
                           r.src_mac_mask.encode('hex'),\
@@ -144,10 +145,12 @@ class TestMACIP(VppTestCase):
                           r.src_ip_prefix_len
         return acls
 
-    def create_acls(self, mac_type, ip_type, acl_count, rules_count):
-        rules = []
+    def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
+                     acl_count=1, rules_count=[1]):
+        acls = []
         src_mac = int("220000dead00", 16)
         for acl in range(2, (acl_count+1) * 2):
+            rules = []
             host = random.choice(self.loop0.remote_hosts)
             is_ip6 = acl % 2
             ip4 = host.ip4.split('.')
@@ -210,12 +213,15 @@ class TestMACIP(VppTestCase):
                 if ip_type == self.WILD_IP:
                     break
 
-            reply = self.vapi.macip_acl_add_replace(rules)
+            acls.append(rules)
+            src_mac += 1099511627776
+        return acls
+
+    def apply_rules(self, acls):
+        for acl in acls:
+            reply = self.vapi.macip_acl_add(acl)
             self.assertEqual(reply.retval, 0)
             self.ACLS.append(reply.acl_index)
-            del rules[:]
-
-            src_mac += 1099511627776
 
     def verify_acls(self, acl_count, rules_count, expected_count=2):
         reply = self.macip_acl_dump_debug()
@@ -459,7 +465,8 @@ class TestMACIP(VppTestCase):
         #     data = p[Raw].load.split(':',1)[1]
         #     print p[p_l3].src, data
 
-    def run_traffic(self, mac_type, ip_type, bridged_routed, is_ip6, packets):
+    def run_traffic(self, mac_type, ip_type, bridged_routed, is_ip6, packets,
+                    do_not_expected_capture=False):
         self.reset_packet_infos()
 
         tx_if = self.pg0 if bridged_routed else self.pg2
@@ -469,7 +476,7 @@ class TestMACIP(VppTestCase):
                                        self.pg2, self.loop0,
                                        bridged_routed, is_ip6)
 
-        reply = self.vapi.macip_acl_add_replace(test_dict['rules'])
+        reply = self.vapi.macip_acl_add(test_dict['rules'])
         self.assertEqual(reply.retval, 0)
         acl_index = reply.acl_index
 
@@ -483,15 +490,20 @@ class TestMACIP(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
-        if mac_type == self.WILD_MAC and ip_type == self.WILD_IP:
-            packet_count = packets
-        capture = rx_if.get_capture(packet_count)
-        self.verify_capture(test_dict['stream'], capture, is_ip6)
+        if do_not_expected_capture:
+            rx_if.get_capture(0)
+        else:
+            packet_count = self.get_packet_count_for_if_idx(
+                self.loop0.sw_if_index)
+            if mac_type == self.WILD_MAC and ip_type == self.WILD_IP:
+                packet_count = packets
+            capture = rx_if.get_capture(packet_count)
+            self.verify_capture(test_dict['stream'], capture, is_ip6)
 
     def run_test_acls(self, mac_type, ip_type, acl_count,
                       rules_count, traffic=None, ip=None):
-        self.create_acls(mac_type, ip_type, acl_count, rules_count)
+        self.apply_rules(self.create_rules(mac_type, ip_type, acl_count,
+                                           rules_count))
         self.verify_acls(acl_count, rules_count)
 
         if traffic is not None:
@@ -687,14 +699,84 @@ class TestMACIP(VppTestCase):
                            [100, 120, 140, 160, 180, 200, 210, 220, 230, 240],
                            self.BRIDGED, self.IS_IP6)
 
+    def test_acl_replace(self):
+        """ MACIP replace ACL
+        """
+
+        r1 = self.create_rules(acl_count=3, rules_count=[2, 2, 2])
+        r2 = self.create_rules(mac_type=self.OUI_MAC, ip_type=self.SUBNET_IP)
+        self.apply_rules(r1)
+
+        acls_before = self.macip_acl_dump_debug()
+
+        # replace acls #2, #3 with new
+        reply = self.vapi.macip_acl_add_replace(r2[0], 2)
+        self.assertEqual(reply.retval, 0)
+        self.assertEqual(reply.acl_index, 2)
+        reply = self.vapi.macip_acl_add_replace(r2[1], 3)
+        self.assertEqual(reply.retval, 0)
+        self.assertEqual(reply.acl_index, 3)
+
+        acls_after = self.macip_acl_dump_debug()
+
+        # verify changes
+        self.assertEqual(len(acls_before), len(acls_after))
+        for acl1, acl2 in zip(
+                acls_before[:2]+acls_before[4:],
+                acls_after[:2]+acls_after[4:]):
+            self.assertEqual(len(acl1), len(acl2))
+
+            self.assertEqual(len(acl1.r), len(acl2.r))
+            for r1, r2 in zip(acl1.r, acl2.r):
+                self.assertEqual(len(acl1.r), len(acl2.r))
+                self.assertEqual(acl1.r, acl2.r)
+        for acl1, acl2 in zip(
+                acls_before[2:4],
+                acls_after[2:4]):
+            self.assertEqual(len(acl1), len(acl2))
+
+            self.assertNotEqual(len(acl1.r), len(acl2.r))
+            for r1, r2 in zip(acl1.r, acl2.r):
+                self.assertNotEqual(len(acl1.r), len(acl2.r))
+                self.assertNotEqual(acl1.r, acl2.r)
+
+    def test_acl_replace_traffic_ip4(self):
+        """ MACIP replace ACL with IP4 traffic
+        """
+        self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
+                         self.BRIDGED, self.IS_IP4, 9)
+
+        r = self.create_rules()
+        # replace acls #2, #3 with new
+        reply = self.vapi.macip_acl_add_replace(r[0], 0)
+        self.assertEqual(reply.retval, 0)
+        self.assertEqual(reply.acl_index, 0)
+
+        self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
+                         self.BRIDGED, self.IS_IP4, 9, True)
+
+    def test_acl_replace_traffic_ip6(self):
+        """ MACIP replace ACL with IP6 traffic
+        """
+        self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
+                         self.BRIDGED, self.IS_IP6, 9)
+
+        r = self.create_rules()
+        # replace acls #2, #3 with new
+        reply = self.vapi.macip_acl_add_replace(r[0], 0)
+        self.assertEqual(reply.retval, 0)
+        self.assertEqual(reply.acl_index, 0)
+
+        self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
+                         self.BRIDGED, self.IS_IP6, 9, True)
+
     def test_delete_intf(self):
         """ MACIP ACL delete intf with acl
         """
 
         intf_count = len(self.interfaces)+1
-        rules_count = [3, 5, 4]
         intf = []
-        self.create_acls(self.EXACT_IP, self.EXACT_MAC, 3, rules_count)
+        self.apply_rules(self.create_rules(acl_count=3, rules_count=[3, 5, 4]))
 
         intf.append(VppLoInterface(self, 0))
         intf.append(VppLoInterface(self, 1))
@@ -748,7 +830,7 @@ class TestMACIP(VppTestCase):
         self.assertEqual(len([x for x in reply.acls if x != 4294967295]), 0)
 
     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
-    def test_check(self):
+    def test_routed(self):
         """ MACIP with routed traffic
         """
         # TODO: routed do not work yet !!!