X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_classifier.py;h=ec2a4143eecc394c332ef25f8ecee7258198217a;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=7ce69d436f5072084560197d7385e9deceb33d0f;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/template_classifier.py b/test/template_classifier.py index 7ce69d436f5..ec2a4143eec 100644 --- a/test/template_classifier.py +++ b/test/template_classifier.py @@ -30,13 +30,11 @@ class VarMatch: class TestClassifier(VppTestCase): - @staticmethod def _resolve_mask_match(mask_match): mask_match = binascii.unhexlify(mask_match) mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16 - mask_match = mask_match + b'\0' * \ - (mask_match_len - len(mask_match)) + mask_match = mask_match + b"\0" * (mask_match_len - len(mask_match)) return mask_match, mask_match_len @classmethod @@ -47,7 +45,7 @@ class TestClassifier(VppTestCase): variables and configure VPP. """ super(TestClassifier, cls).setUpClass() - cls.acl_active_table = '' + cls.acl_active_table = "" cls.af = AF_INET def setUp(self): @@ -113,13 +111,15 @@ class TestClassifier(VppTestCase): self.logger.info(self.vapi.cli("show ip fib")) self.logger.info(self.vapi.cli("show error")) - if self.acl_active_table.endswith('out'): + if self.acl_active_table.endswith("out"): self.output_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - elif self.acl_active_table != '': + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + elif self.acl_active_table != "": self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" for intf in self.interfaces: if self.af == AF_INET: @@ -131,7 +131,7 @@ class TestClassifier(VppTestCase): super(TestClassifier, self).tearDown() @staticmethod - def build_mac_match(dst_mac='', src_mac='', ether_type=''): + def build_mac_match(dst_mac="", src_mac="", ether_type=""): """Build MAC ACL match data with hexstring format. :param str dst_mac: source MAC address @@ -139,15 +139,16 @@ class TestClassifier(VppTestCase): :param str ether_type: ethernet type <0-ffff> """ if dst_mac: - dst_mac = dst_mac.replace(':', '') + dst_mac = dst_mac.replace(":", "") if src_mac: - src_mac = src_mac.replace(':', '') + src_mac = src_mac.replace(":", "") - return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( - dst_mac, src_mac, ether_type)).rstrip() + return ( + "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type) + ).rstrip() @staticmethod - def build_mac_mask(dst_mac='', src_mac='', ether_type=''): + def build_mac_mask(dst_mac="", src_mac="", ether_type=""): """Build MAC ACL mask data with hexstring format. :param str dst_mac: source MAC address <0-ffffffffffff> @@ -155,12 +156,12 @@ class TestClassifier(VppTestCase): :param str ether_type: ethernet type <0-ffff> """ - return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( - dst_mac, src_mac, ether_type)).rstrip() + return ( + "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type) + ).rstrip() @staticmethod - def build_ip_mask(proto='', src_ip='', dst_ip='', - src_port='', dst_port=''): + def build_ip_mask(proto="", src_ip="", dst_ip="", src_port="", dst_port=""): """Build IP ACL mask data with hexstring format. :param str proto: protocol number <0-ff> @@ -170,12 +171,14 @@ class TestClassifier(VppTestCase): :param str dst_port: destination port number <0-ffff> """ - return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format( - proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0') + return ( + "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format( + proto, src_ip, dst_ip, src_port, dst_port + ) + ).rstrip("0") @staticmethod - def build_ip6_mask(nh='', src_ip='', dst_ip='', - src_port='', dst_port=''): + def build_ip6_mask(nh="", src_ip="", dst_ip="", src_port="", dst_port=""): """Build IPv6 ACL mask data with hexstring format. :param str nh: next header number <0-ff> @@ -185,24 +188,26 @@ class TestClassifier(VppTestCase): :param str dst_port: destination port number <0-ffff> """ - return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format( - nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0') + return ( + "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format( + nh, src_ip, dst_ip, src_port, dst_port + ) + ).rstrip("0") @staticmethod def build_payload_mask(masks): - payload_mask = '' + payload_mask = "" for mask in masks: # offset is specified in bytes, convert to hex format. length = (mask.offset * 2) + len(mask.spec) - format_spec = '{!s:0>' + str(length) + '}' + format_spec = "{!s:0>" + str(length) + "}" payload_mask += format_spec.format(mask.spec) - return payload_mask.rstrip('0') + return payload_mask.rstrip("0") @staticmethod - def build_ip_match(proto=0, src_ip='', dst_ip='', - src_port=0, dst_port=0): + def build_ip_match(proto=0, src_ip="", dst_ip="", src_port=0, dst_port=0): """Build IP ACL match data with hexstring format. :param int proto: protocol number with valid option "x" @@ -212,17 +217,18 @@ class TestClassifier(VppTestCase): :param int dst_port: destination port number "x" """ if src_ip: - src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii') + src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode("ascii") if dst_ip: - dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii') + dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode("ascii") - return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format( - hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], - hex(dst_port)[2:])).rstrip('0') + return ( + "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format( + hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:] + ) + ).rstrip("0") @staticmethod - def build_ip6_match(nh=0, src_ip='', dst_ip='', - src_port=0, dst_port=0): + def build_ip6_match(nh=0, src_ip="", dst_ip="", src_port=0, dst_port=0): """Build IPv6 ACL match data with hexstring format. :param int nh: next header number with valid option "x" @@ -234,39 +240,44 @@ class TestClassifier(VppTestCase): """ if src_ip: if sys.version_info[0] == 2: - src_ip = binascii.hexlify(socket.inet_pton( - socket.AF_INET6, src_ip)) + src_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, src_ip)) else: src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex() if dst_ip: if sys.version_info[0] == 2: - dst_ip = binascii.hexlify(socket.inet_pton( - socket.AF_INET6, dst_ip)) + dst_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, dst_ip)) else: dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex() - return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format( - hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], - hex(dst_port)[2:])).rstrip('0') + return ( + "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format( + hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:] + ) + ).rstrip("0") @staticmethod def build_payload_match(matches): - payload_match = '' + payload_match = "" for match in matches: sval = str(hex(match.value)[2:]) # offset is specified in bytes, convert to hex format. length = (match.offset + match.length) * 2 - format_spec = '{!s:0>' + str(length) + '}' + format_spec = "{!s:0>" + str(length) + "}" payload_match += format_spec.format(sval) - return payload_match.rstrip('0') + return payload_match.rstrip("0") - def create_stream(self, src_if, dst_if, packet_sizes, - proto_l=UDP(sport=1234, dport=5678), - payload_ex=None): + def create_stream( + self, + src_if, + dst_if, + packet_sizes, + proto_l=UDP(sport=1234, dport=5678), + payload_ex=None, + ): """Create input packet stream for defined interfaces. :param VppInterface src_if: Source Interface for packet stream. @@ -285,15 +296,19 @@ class TestClassifier(VppTestCase): payload += payload_ex if self.af == AF_INET: - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / - proto_l / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / proto_l + / Raw(payload) + ) elif self.af == AF_INET6: - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) / - proto_l / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + / proto_l + / Raw(payload) + ) info.data = p.copy() self.extend_packet(p, size) pkts.append(p) @@ -322,11 +337,12 @@ class TestClassifier(VppTestCase): packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( - "Got packet on port %s: src=%u (id=%u)" % - (dst_if.name, payload_info.src, packet_index)) + "Got packet on port %s: src=%u (id=%u)" + % (dst_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -343,13 +359,15 @@ class TestClassifier(VppTestCase): raise for i in self.interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]) - self.assertTrue(remaining_packet is None, - "Interface %s: Packet expected from interface %s " - "didn't arrive" % (dst_if.name, i.name)) - - def create_classify_table(self, key, mask, data_offset=0, - next_table_index=None): + i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index] + ) + self.assertTrue( + remaining_packet is None, + "Interface %s: Packet expected from interface %s " + "didn't arrive" % (dst_if.name, i.name), + ) + + def create_classify_table(self, key, mask, data_offset=0, next_table_index=None): """Create Classify Table :param str key: key for classify table (ex, ACL name). @@ -366,12 +384,14 @@ class TestClassifier(VppTestCase): miss_next_index=0, current_data_flag=1, current_data_offset=data_offset, - next_table_index=next_table_index) - self.assertIsNotNone(r, 'No response msg for add_del_table') + next_table_index=next_table_index, + ) + self.assertIsNotNone(r, "No response msg for add_del_table") self.acl_tbl_idx[key] = r.new_table_index - def create_classify_session(self, table_index, match, pbr_option=0, - vrfid=0, is_add=1): + def create_classify_session( + self, table_index, match, pbr_option=0, vrfid=0, is_add=1 + ): """Create Classify Session :param int table_index: table index to identify classify table. @@ -389,8 +409,9 @@ class TestClassifier(VppTestCase): match_len=mask_match_len, opaque_index=0, action=pbr_option, - metadata=vrfid) - self.assertIsNotNone(r, 'No response msg for add_del_session') + metadata=vrfid, + ) + self.assertIsNotNone(r, "No response msg for add_del_session") def input_acl_set_interface(self, intf, table_index, is_add=1): """Configure Input ACL interface @@ -403,20 +424,17 @@ class TestClassifier(VppTestCase): r = None if self.af == AF_INET: r = self.vapi.input_acl_set_interface( - is_add, - intf.sw_if_index, - ip4_table_index=table_index) + is_add, intf.sw_if_index, ip4_table_index=table_index + ) elif self.af == AF_INET6: r = self.vapi.input_acl_set_interface( - is_add, - intf.sw_if_index, - ip6_table_index=table_index) + is_add, intf.sw_if_index, ip6_table_index=table_index + ) else: r = self.vapi.input_acl_set_interface( - is_add, - intf.sw_if_index, - l2_table_index=table_index) - self.assertIsNotNone(r, 'No response msg for acl_set_interface') + is_add, intf.sw_if_index, l2_table_index=table_index + ) + self.assertIsNotNone(r, "No response msg for acl_set_interface") def output_acl_set_interface(self, intf, table_index, is_add=1): """Configure Output ACL interface @@ -429,20 +447,17 @@ class TestClassifier(VppTestCase): r = None if self.af == AF_INET: r = self.vapi.output_acl_set_interface( - is_add, - intf.sw_if_index, - ip4_table_index=table_index) + is_add, intf.sw_if_index, ip4_table_index=table_index + ) elif self.af == AF_INET6: r = self.vapi.output_acl_set_interface( - is_add, - intf.sw_if_index, - ip6_table_index=table_index) + is_add, intf.sw_if_index, ip6_table_index=table_index + ) else: r = self.vapi.output_acl_set_interface( - is_add, - intf.sw_if_index, - l2_table_index=table_index) - self.assertIsNotNone(r, 'No response msg for acl_set_interface') + is_add, intf.sw_if_index, l2_table_index=table_index + ) + self.assertIsNotNone(r, "No response msg for acl_set_interface") def config_pbr_fib_entry(self, intf, is_add=1): """Configure fib entry to route traffic toward PBR VRF table @@ -451,10 +466,13 @@ class TestClassifier(VppTestCase): """ addr_len = 24 - self.vapi.ip_add_del_route(dst_address=intf.local_ip4, - dst_address_length=addr_len, - next_hop_address=intf.remote_ip4, - table_id=self.pbr_vrfid, is_add=is_add) + self.vapi.ip_add_del_route( + dst_address=intf.local_ip4, + dst_address_length=addr_len, + next_hop_address=intf.remote_ip4, + table_id=self.pbr_vrfid, + is_add=is_add, + ) def verify_vrf(self, vrf_id): """