fib: fib api updates
[vpp.git] / test / test_classifier.py
index f865cb9..4892d26 100644 (file)
@@ -10,6 +10,8 @@ from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, TCP
 from util import ppp
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import INVALID_INDEX
 
 
 class TestClassifier(VppTestCase):
@@ -25,6 +27,10 @@ class TestClassifier(VppTestCase):
         super(TestClassifier, cls).setUpClass()
         cls.acl_active_table = ''
 
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifier, cls).tearDownClass()
+
     def setUp(self):
         """
         Perform test setup before test case.
@@ -71,10 +77,6 @@ class TestClassifier(VppTestCase):
     def tearDown(self):
         """Run standard test teardown and acl related log."""
         if not self.vpp_dead:
-            self.logger.info(self.vapi.ppcli("show inacl type ip4"))
-            self.logger.info(self.vapi.ppcli("show outacl type ip4"))
-            self.logger.info(self.vapi.cli("show classify table verbose"))
-            self.logger.info(self.vapi.cli("show ip fib"))
             if self.acl_active_table == 'ip_out':
                 self.output_acl_set_interface(
                     self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
@@ -89,18 +91,11 @@ class TestClassifier(VppTestCase):
 
         super(TestClassifier, self).tearDown()
 
-    def config_pbr_fib_entry(self, intf, is_add=1):
-        """Configure fib entry to route traffic toward PBR VRF table
-
-        :param VppInterface intf: destination interface to be routed for PBR.
-
-        """
-        addr_len = 24
-        self.vapi.ip_add_del_route(intf.local_ip4n,
-                                   addr_len,
-                                   intf.remote_ip4n,
-                                   table_id=self.pbr_vrfid,
-                                   is_add=is_add)
+    def show_commands_at_teardown(self):
+        self.logger.info(self.vapi.ppcli("show inacl type ip4"))
+        self.logger.info(self.vapi.ppcli("show outacl type ip4"))
+        self.logger.info(self.vapi.cli("show classify table verbose"))
+        self.logger.info(self.vapi.cli("show ip fib"))
 
     def create_stream(self, src_if, dst_if, packet_sizes,
                       proto_l=UDP(sport=1234, dport=5678)):
@@ -141,7 +136,7 @@ class TestClassifier(VppTestCase):
             try:
                 ip_received = packet[IP]
                 proto_received = packet[proto_l]
-                payload_info = self.payload_to_info(str(packet[Raw]))
+                payload_info = self.payload_to_info(packet[Raw])
                 packet_index = payload_info.index
                 self.assertEqual(payload_info.dst, dst_sw_if_index)
                 self.logger.debug(
@@ -178,11 +173,9 @@ class TestClassifier(VppTestCase):
         :param int vrf_id: The FIB table / VRF ID to be verified.
         :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
         """
-        ip_fib_dump = self.vapi.ip_fib_dump()
-        vrf_count = 0
-        for ip_fib_details in ip_fib_dump:
-            if ip_fib_details[2] == vrf_id:
-                vrf_count += 1
+        ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
+        vrf_count = len(ip_fib_dump)
+
         if vrf_count == 0:
             self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
             return 0
@@ -202,7 +195,7 @@ class TestClassifier(VppTestCase):
         :param str dst_port: destination port number <0-ffff>
         """
 
-        return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
+        return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
             proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
 
     @staticmethod
@@ -221,7 +214,7 @@ class TestClassifier(VppTestCase):
         if dst_ip:
             dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
 
-        return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
+        return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
             hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
             hex(dst_port)[2:])).rstrip('0')
 
@@ -234,8 +227,8 @@ class TestClassifier(VppTestCase):
         :param str ether_type: ethernet type <0-ffff>
         """
 
-        return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
-                                              ether_type)).rstrip('0')
+        return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
+            dst_mac, src_mac, ether_type)).rstrip('0')
 
     @staticmethod
     def build_mac_match(dst_mac='', src_mac='', ether_type=''):
@@ -250,8 +243,8 @@ class TestClassifier(VppTestCase):
         if src_mac:
             src_mac = src_mac.replace(':', '')
 
-        return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
-                                              ether_type)).rstrip('0')
+        return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
+            dst_mac, src_mac, ether_type)).rstrip('0')
 
     def create_classify_table(self, key, mask, data_offset=0):
         """Create Classify Table
@@ -267,7 +260,7 @@ class TestClassifier(VppTestCase):
             miss_next_index=0,
             current_data_flag=1,
             current_data_offset=data_offset)
-        self.assertIsNotNone(r, msg='No response msg for add_del_table')
+        self.assertIsNotNone(r, 'No response msg for add_del_table')
         self.acl_tbl_idx[key] = r.new_table_index
 
     def create_classify_session(self, table_index, match, pbr_option=0,
@@ -288,7 +281,7 @@ class TestClassifier(VppTestCase):
             opaque_index=0,
             action=pbr_option,
             metadata=vrfid)
-        self.assertIsNotNone(r, msg='No response msg for add_del_session')
+        self.assertIsNotNone(r, 'No response msg for add_del_session')
 
     def input_acl_set_interface(self, intf, table_index, is_add=1):
         """Configure Input ACL interface
@@ -302,7 +295,7 @@ class TestClassifier(VppTestCase):
             is_add,
             intf.sw_if_index,
             ip4_table_index=table_index)
-        self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+        self.assertIsNotNone(r, 'No response msg for acl_set_interface')
 
     def output_acl_set_interface(self, intf, table_index, is_add=1):
         """Configure Output ACL interface
@@ -316,7 +309,7 @@ class TestClassifier(VppTestCase):
             is_add,
             intf.sw_if_index,
             ip4_table_index=table_index)
-        self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+        self.assertIsNotNone(r, 'No response msg for acl_set_interface')
 
 
 # Tests split to different test case classes because of issue reported in
@@ -324,6 +317,14 @@ class TestClassifier(VppTestCase):
 class TestClassifierIP(TestClassifier):
     """ Classifier IP Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestClassifierIP, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifierIP, cls).tearDownClass()
+
     def test_iacl_src_ip(self):
         """ Source IP iACL test
 
@@ -420,6 +421,14 @@ class TestClassifierIP(TestClassifier):
 class TestClassifierUDP(TestClassifier):
     """ Classifier UDP proto Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestClassifierUDP, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifierUDP, cls).tearDownClass()
+
     def test_iacl_proto_udp(self):
         """ UDP protocol iACL test
 
@@ -560,6 +569,14 @@ class TestClassifierUDP(TestClassifier):
 class TestClassifierTCP(TestClassifier):
     """ Classifier TCP proto Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestClassifierTCP, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifierTCP, cls).tearDownClass()
+
     def test_iacl_proto_tcp(self):
         """ TCP protocol iACL test
 
@@ -701,6 +718,14 @@ class TestClassifierTCP(TestClassifier):
 class TestClassifierIPOut(TestClassifier):
     """ Classifier output IP Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestClassifierIPOut, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifierIPOut, cls).tearDownClass()
+
     def test_acl_ip_out(self):
         """ Output IP ACL test
 
@@ -736,6 +761,14 @@ class TestClassifierIPOut(TestClassifier):
 class TestClassifierMAC(TestClassifier):
     """ Classifier MAC Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestClassifierMAC, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifierMAC, cls).tearDownClass()
+
     def test_acl_mac(self):
         """ MAC ACL test
 
@@ -771,6 +804,14 @@ class TestClassifierMAC(TestClassifier):
 class TestClassifierPBR(TestClassifier):
     """ Classifier PBR Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestClassifierPBR, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestClassifierPBR, cls).tearDownClass()
+
     def test_acl_pbr(self):
         """ IP PBR test
 
@@ -793,7 +834,12 @@ class TestClassifierPBR(TestClassifier):
             self.build_ip_match(src_ip=self.pg0.remote_ip4),
             pbr_option, self.pbr_vrfid)
         self.assertTrue(self.verify_vrf(self.pbr_vrfid))
-        self.config_pbr_fib_entry(self.pg3)
+        r = VppIpRoute(self, self.pg3.local_ip4, 24,
+                       [VppRoutePath(self.pg3.remote_ip4,
+                                     INVALID_INDEX)],
+                       table_id=self.pbr_vrfid)
+        r.add_vpp_config()
+
         self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
 
         self.pg_enable_capture(self.pg_interfaces)
@@ -807,7 +853,7 @@ class TestClassifierPBR(TestClassifier):
         self.pg2.assert_nothing_captured(remark="packets forwarded")
 
         # remove the classify session and the route
-        self.config_pbr_fib_entry(self.pg3, is_add=0)
+        r.remove_vpp_config()
         self.create_classify_session(
             self.acl_tbl_idx.get(key),
             self.build_ip_match(src_ip=self.pg0.remote_ip4),