X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_classifier.py;h=4892d26a51533dbde370d18ccea7bf5e458fae79;hb=097fa66b986f06281f603767d321ab13ab6c88c3;hp=f865cb9f6d5456805ff42bef702be543a3760d6a;hpb=6e4c6ad92e59045f0babf5af5093cb8402ec37fb;p=vpp.git diff --git a/test/test_classifier.py b/test/test_classifier.py index f865cb9f6d5..4892d26a515 100644 --- a/test/test_classifier.py +++ b/test/test_classifier.py @@ -10,6 +10,8 @@ from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, TCP from util import ppp +from vpp_ip_route import VppIpRoute, VppRoutePath +from vpp_ip import INVALID_INDEX class TestClassifier(VppTestCase): @@ -25,6 +27,10 @@ class TestClassifier(VppTestCase): super(TestClassifier, cls).setUpClass() cls.acl_active_table = '' + @classmethod + def tearDownClass(cls): + super(TestClassifier, cls).tearDownClass() + def setUp(self): """ Perform test setup before test case. @@ -71,10 +77,6 @@ class TestClassifier(VppTestCase): def tearDown(self): """Run standard test teardown and acl related log.""" if not self.vpp_dead: - self.logger.info(self.vapi.ppcli("show inacl type ip4")) - self.logger.info(self.vapi.ppcli("show outacl type ip4")) - self.logger.info(self.vapi.cli("show classify table verbose")) - self.logger.info(self.vapi.cli("show ip fib")) if self.acl_active_table == 'ip_out': self.output_acl_set_interface( self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) @@ -89,18 +91,11 @@ class TestClassifier(VppTestCase): super(TestClassifier, self).tearDown() - def config_pbr_fib_entry(self, intf, is_add=1): - """Configure fib entry to route traffic toward PBR VRF table - - :param VppInterface intf: destination interface to be routed for PBR. - - """ - addr_len = 24 - self.vapi.ip_add_del_route(intf.local_ip4n, - addr_len, - intf.remote_ip4n, - table_id=self.pbr_vrfid, - is_add=is_add) + def show_commands_at_teardown(self): + self.logger.info(self.vapi.ppcli("show inacl type ip4")) + self.logger.info(self.vapi.ppcli("show outacl type ip4")) + self.logger.info(self.vapi.cli("show classify table verbose")) + self.logger.info(self.vapi.cli("show ip fib")) def create_stream(self, src_if, dst_if, packet_sizes, proto_l=UDP(sport=1234, dport=5678)): @@ -141,7 +136,7 @@ class TestClassifier(VppTestCase): try: ip_received = packet[IP] proto_received = packet[proto_l] - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( @@ -178,11 +173,9 @@ class TestClassifier(VppTestCase): :param int vrf_id: The FIB table / VRF ID to be verified. :return: 1 if the FIB table / VRF ID is configured, otherwise return 0. """ - ip_fib_dump = self.vapi.ip_fib_dump() - vrf_count = 0 - for ip_fib_details in ip_fib_dump: - if ip_fib_details[2] == vrf_id: - vrf_count += 1 + ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False) + vrf_count = len(ip_fib_dump) + if vrf_count == 0: self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id) return 0 @@ -202,7 +195,7 @@ class TestClassifier(VppTestCase): :param str dst_port: destination port number <0-ffff> """ - return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format( + return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format( proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0') @staticmethod @@ -221,7 +214,7 @@ class TestClassifier(VppTestCase): if dst_ip: dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)) - return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format( + return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format( hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:])).rstrip('0') @@ -234,8 +227,8 @@ class TestClassifier(VppTestCase): :param str ether_type: ethernet type <0-ffff> """ - return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac, - ether_type)).rstrip('0') + return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( + dst_mac, src_mac, ether_type)).rstrip('0') @staticmethod def build_mac_match(dst_mac='', src_mac='', ether_type=''): @@ -250,8 +243,8 @@ class TestClassifier(VppTestCase): if src_mac: src_mac = src_mac.replace(':', '') - return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac, - ether_type)).rstrip('0') + return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( + dst_mac, src_mac, ether_type)).rstrip('0') def create_classify_table(self, key, mask, data_offset=0): """Create Classify Table @@ -267,7 +260,7 @@ class TestClassifier(VppTestCase): miss_next_index=0, current_data_flag=1, current_data_offset=data_offset) - self.assertIsNotNone(r, msg='No response msg for add_del_table') + self.assertIsNotNone(r, 'No response msg for add_del_table') self.acl_tbl_idx[key] = r.new_table_index def create_classify_session(self, table_index, match, pbr_option=0, @@ -288,7 +281,7 @@ class TestClassifier(VppTestCase): opaque_index=0, action=pbr_option, metadata=vrfid) - self.assertIsNotNone(r, msg='No response msg for add_del_session') + self.assertIsNotNone(r, 'No response msg for add_del_session') def input_acl_set_interface(self, intf, table_index, is_add=1): """Configure Input ACL interface @@ -302,7 +295,7 @@ class TestClassifier(VppTestCase): is_add, intf.sw_if_index, ip4_table_index=table_index) - self.assertIsNotNone(r, msg='No response msg for acl_set_interface') + self.assertIsNotNone(r, 'No response msg for acl_set_interface') def output_acl_set_interface(self, intf, table_index, is_add=1): """Configure Output ACL interface @@ -316,7 +309,7 @@ class TestClassifier(VppTestCase): is_add, intf.sw_if_index, ip4_table_index=table_index) - self.assertIsNotNone(r, msg='No response msg for acl_set_interface') + self.assertIsNotNone(r, 'No response msg for acl_set_interface') # Tests split to different test case classes because of issue reported in @@ -324,6 +317,14 @@ class TestClassifier(VppTestCase): class TestClassifierIP(TestClassifier): """ Classifier IP Test Case """ + @classmethod + def setUpClass(cls): + super(TestClassifierIP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierIP, cls).tearDownClass() + def test_iacl_src_ip(self): """ Source IP iACL test @@ -420,6 +421,14 @@ class TestClassifierIP(TestClassifier): class TestClassifierUDP(TestClassifier): """ Classifier UDP proto Test Case """ + @classmethod + def setUpClass(cls): + super(TestClassifierUDP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierUDP, cls).tearDownClass() + def test_iacl_proto_udp(self): """ UDP protocol iACL test @@ -560,6 +569,14 @@ class TestClassifierUDP(TestClassifier): class TestClassifierTCP(TestClassifier): """ Classifier TCP proto Test Case """ + @classmethod + def setUpClass(cls): + super(TestClassifierTCP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierTCP, cls).tearDownClass() + def test_iacl_proto_tcp(self): """ TCP protocol iACL test @@ -701,6 +718,14 @@ class TestClassifierTCP(TestClassifier): class TestClassifierIPOut(TestClassifier): """ Classifier output IP Test Case """ + @classmethod + def setUpClass(cls): + super(TestClassifierIPOut, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierIPOut, cls).tearDownClass() + def test_acl_ip_out(self): """ Output IP ACL test @@ -736,6 +761,14 @@ class TestClassifierIPOut(TestClassifier): class TestClassifierMAC(TestClassifier): """ Classifier MAC Test Case """ + @classmethod + def setUpClass(cls): + super(TestClassifierMAC, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierMAC, cls).tearDownClass() + def test_acl_mac(self): """ MAC ACL test @@ -771,6 +804,14 @@ class TestClassifierMAC(TestClassifier): class TestClassifierPBR(TestClassifier): """ Classifier PBR Test Case """ + @classmethod + def setUpClass(cls): + super(TestClassifierPBR, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierPBR, cls).tearDownClass() + def test_acl_pbr(self): """ IP PBR test @@ -793,7 +834,12 @@ class TestClassifierPBR(TestClassifier): self.build_ip_match(src_ip=self.pg0.remote_ip4), pbr_option, self.pbr_vrfid) self.assertTrue(self.verify_vrf(self.pbr_vrfid)) - self.config_pbr_fib_entry(self.pg3) + r = VppIpRoute(self, self.pg3.local_ip4, 24, + [VppRoutePath(self.pg3.remote_ip4, + INVALID_INDEX)], + table_id=self.pbr_vrfid) + r.add_vpp_config() + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.pg_enable_capture(self.pg_interfaces) @@ -807,7 +853,7 @@ class TestClassifierPBR(TestClassifier): self.pg2.assert_nothing_captured(remark="packets forwarded") # remove the classify session and the route - self.config_pbr_fib_entry(self.pg3, is_add=0) + r.remove_vpp_config() self.create_classify_session( self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4),