X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_classifier.py;h=ec2a4143eecc394c332ef25f8ecee7258198217a;hb=f4e3ee1dd1b6edaca193a5a51057cf19c653aeb2;hp=b8f79b4f99f5029caae029cd2f72dc4ac0b016bb;hpb=54a778010aac184a17ff1c2ef25bfe03aac415d3;p=vpp.git diff --git a/test/template_classifier.py b/test/template_classifier.py index b8f79b4f99f..ec2a4143eec 100644 --- a/test/template_classifier.py +++ b/test/template_classifier.py @@ -5,6 +5,7 @@ import socket from socket import AF_INET, AF_INET6 import unittest import sys +from dataclasses import dataclass from framework import VppTestCase @@ -15,14 +16,25 @@ from scapy.layers.inet6 import IPv6 from util import ppp -class TestClassifier(VppTestCase): +@dataclass +class VarMask: + offset: int + spec: str + + +@dataclass +class VarMatch: + offset: int + value: int + length: int + +class TestClassifier(VppTestCase): @staticmethod def _resolve_mask_match(mask_match): mask_match = binascii.unhexlify(mask_match) mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16 - mask_match = mask_match + b'\0' * \ - (mask_match_len - len(mask_match)) + mask_match = mask_match + b"\0" * (mask_match_len - len(mask_match)) return mask_match, mask_match_len @classmethod @@ -33,7 +45,7 @@ class TestClassifier(VppTestCase): variables and configure VPP. """ super(TestClassifier, cls).setUpClass() - cls.acl_active_table = '' + cls.acl_active_table = "" cls.af = AF_INET def setUp(self): @@ -97,19 +109,17 @@ class TestClassifier(VppTestCase): self.logger.info(self.vapi.cli("show classify table verbose")) self.logger.info(self.vapi.cli("show ip fib")) + self.logger.info(self.vapi.cli("show error")) - acl_active_table = 'ip_out' - if self.af == AF_INET6: - acl_active_table = 'ip6_out' - - if self.acl_active_table == acl_active_table: + if self.acl_active_table.endswith("out"): self.output_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' - elif self.acl_active_table != '': + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + elif self.acl_active_table != "": self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" for intf in self.interfaces: if self.af == AF_INET: @@ -121,7 +131,7 @@ class TestClassifier(VppTestCase): super(TestClassifier, self).tearDown() @staticmethod - def build_mac_match(dst_mac='', src_mac='', ether_type=''): + def build_mac_match(dst_mac="", src_mac="", ether_type=""): """Build MAC ACL match data with hexstring format. :param str dst_mac: source MAC address @@ -129,15 +139,16 @@ class TestClassifier(VppTestCase): :param str ether_type: ethernet type <0-ffff> """ if dst_mac: - dst_mac = dst_mac.replace(':', '') + dst_mac = dst_mac.replace(":", "") if src_mac: - src_mac = src_mac.replace(':', '') + src_mac = src_mac.replace(":", "") - return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( - dst_mac, src_mac, ether_type)).rstrip('0') + return ( + "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type) + ).rstrip() @staticmethod - def build_mac_mask(dst_mac='', src_mac='', ether_type=''): + def build_mac_mask(dst_mac="", src_mac="", ether_type=""): """Build MAC ACL mask data with hexstring format. :param str dst_mac: source MAC address <0-ffffffffffff> @@ -145,12 +156,12 @@ class TestClassifier(VppTestCase): :param str ether_type: ethernet type <0-ffff> """ - return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( - dst_mac, src_mac, ether_type)).rstrip('0') + return ( + "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type) + ).rstrip() @staticmethod - def build_ip_mask(proto='', src_ip='', dst_ip='', - src_port='', dst_port=''): + def build_ip_mask(proto="", src_ip="", dst_ip="", src_port="", dst_port=""): """Build IP ACL mask data with hexstring format. :param str proto: protocol number <0-ff> @@ -160,12 +171,14 @@ class TestClassifier(VppTestCase): :param str dst_port: destination port number <0-ffff> """ - return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format( - proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0') + return ( + "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format( + proto, src_ip, dst_ip, src_port, dst_port + ) + ).rstrip("0") @staticmethod - def build_ip6_mask(nh='', src_ip='', dst_ip='', - src_port='', dst_port=''): + def build_ip6_mask(nh="", src_ip="", dst_ip="", src_port="", dst_port=""): """Build IPv6 ACL mask data with hexstring format. :param str nh: next header number <0-ff> @@ -175,12 +188,26 @@ class TestClassifier(VppTestCase): :param str dst_port: destination port number <0-ffff> """ - return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format( - nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0') + return ( + "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format( + nh, src_ip, dst_ip, src_port, dst_port + ) + ).rstrip("0") @staticmethod - def build_ip_match(proto=0, src_ip='', dst_ip='', - src_port=0, dst_port=0): + def build_payload_mask(masks): + payload_mask = "" + + for mask in masks: + # offset is specified in bytes, convert to hex format. + length = (mask.offset * 2) + len(mask.spec) + format_spec = "{!s:0>" + str(length) + "}" + payload_mask += format_spec.format(mask.spec) + + return payload_mask.rstrip("0") + + @staticmethod + def build_ip_match(proto=0, src_ip="", dst_ip="", src_port=0, dst_port=0): """Build IP ACL match data with hexstring format. :param int proto: protocol number with valid option "x" @@ -190,17 +217,18 @@ class TestClassifier(VppTestCase): :param int dst_port: destination port number "x" """ if src_ip: - src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii') + src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode("ascii") if dst_ip: - dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii') + dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode("ascii") - return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format( - hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], - hex(dst_port)[2:])).rstrip('0') + return ( + "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format( + hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:] + ) + ).rstrip("0") @staticmethod - def build_ip6_match(nh=0, src_ip='', dst_ip='', - src_port=0, dst_port=0): + def build_ip6_match(nh=0, src_ip="", dst_ip="", src_port=0, dst_port=0): """Build IPv6 ACL match data with hexstring format. :param int nh: next header number with valid option "x" @@ -212,24 +240,44 @@ class TestClassifier(VppTestCase): """ if src_ip: if sys.version_info[0] == 2: - src_ip = binascii.hexlify(socket.inet_pton( - socket.AF_INET6, src_ip)) + src_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, src_ip)) else: src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex() if dst_ip: if sys.version_info[0] == 2: - dst_ip = binascii.hexlify(socket.inet_pton( - socket.AF_INET6, dst_ip)) + dst_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, dst_ip)) else: dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex() - return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format( - hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], - hex(dst_port)[2:])).rstrip('0') + return ( + "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format( + hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:] + ) + ).rstrip("0") - def create_stream(self, src_if, dst_if, packet_sizes, - proto_l=UDP(sport=1234, dport=5678)): + @staticmethod + def build_payload_match(matches): + payload_match = "" + + for match in matches: + sval = str(hex(match.value)[2:]) + # offset is specified in bytes, convert to hex format. + length = (match.offset + match.length) * 2 + + format_spec = "{!s:0>" + str(length) + "}" + payload_match += format_spec.format(sval) + + return payload_match.rstrip("0") + + def create_stream( + self, + src_if, + dst_if, + packet_sizes, + proto_l=UDP(sport=1234, dport=5678), + payload_ex=None, + ): """Create input packet stream for defined interfaces. :param VppInterface src_if: Source Interface for packet stream. @@ -242,16 +290,25 @@ class TestClassifier(VppTestCase): for size in packet_sizes: info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) + + # append any additional payload after info + if payload_ex is not None: + payload += payload_ex + if self.af == AF_INET: - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / - proto_l / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / proto_l + / Raw(payload) + ) elif self.af == AF_INET6: - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) / - proto_l / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + / proto_l + / Raw(payload) + ) info.data = p.copy() self.extend_packet(p, size) pkts.append(p) @@ -280,11 +337,12 @@ class TestClassifier(VppTestCase): packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( - "Got packet on port %s: src=%u (id=%u)" % - (dst_if.name, payload_info.src, packet_index)) + "Got packet on port %s: src=%u (id=%u)" + % (dst_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -301,17 +359,21 @@ class TestClassifier(VppTestCase): raise for i in self.interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]) - self.assertTrue(remaining_packet is None, - "Interface %s: Packet expected from interface %s " - "didn't arrive" % (dst_if.name, i.name)) - - def create_classify_table(self, key, mask, data_offset=0): + i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index] + ) + self.assertTrue( + remaining_packet is None, + "Interface %s: Packet expected from interface %s " + "didn't arrive" % (dst_if.name, i.name), + ) + + def create_classify_table(self, key, mask, data_offset=0, next_table_index=None): """Create Classify Table :param str key: key for classify table (ex, ACL name). :param str mask: mask value for interested traffic. :param int data_offset: + :param str next_table_index """ mask_match, mask_match_len = self._resolve_mask_match(mask) r = self.vapi.classify_add_del_table( @@ -321,12 +383,15 @@ class TestClassifier(VppTestCase): match_n_vectors=(len(mask) - 1) // 32 + 1, miss_next_index=0, current_data_flag=1, - current_data_offset=data_offset) - self.assertIsNotNone(r, 'No response msg for add_del_table') + current_data_offset=data_offset, + next_table_index=next_table_index, + ) + self.assertIsNotNone(r, "No response msg for add_del_table") self.acl_tbl_idx[key] = r.new_table_index - def create_classify_session(self, table_index, match, pbr_option=0, - vrfid=0, is_add=1): + def create_classify_session( + self, table_index, match, pbr_option=0, vrfid=0, is_add=1 + ): """Create Classify Session :param int table_index: table index to identify classify table. @@ -344,8 +409,9 @@ class TestClassifier(VppTestCase): match_len=mask_match_len, opaque_index=0, action=pbr_option, - metadata=vrfid) - self.assertIsNotNone(r, 'No response msg for add_del_session') + metadata=vrfid, + ) + self.assertIsNotNone(r, "No response msg for add_del_session") def input_acl_set_interface(self, intf, table_index, is_add=1): """Configure Input ACL interface @@ -358,20 +424,17 @@ class TestClassifier(VppTestCase): r = None if self.af == AF_INET: r = self.vapi.input_acl_set_interface( - is_add, - intf.sw_if_index, - ip4_table_index=table_index) + is_add, intf.sw_if_index, ip4_table_index=table_index + ) elif self.af == AF_INET6: r = self.vapi.input_acl_set_interface( - is_add, - intf.sw_if_index, - ip6_table_index=table_index) + is_add, intf.sw_if_index, ip6_table_index=table_index + ) else: r = self.vapi.input_acl_set_interface( - is_add, - intf.sw_if_index, - l2_table_index=table_index) - self.assertIsNotNone(r, 'No response msg for acl_set_interface') + is_add, intf.sw_if_index, l2_table_index=table_index + ) + self.assertIsNotNone(r, "No response msg for acl_set_interface") def output_acl_set_interface(self, intf, table_index, is_add=1): """Configure Output ACL interface @@ -384,20 +447,17 @@ class TestClassifier(VppTestCase): r = None if self.af == AF_INET: r = self.vapi.output_acl_set_interface( - is_add, - intf.sw_if_index, - ip4_table_index=table_index) + is_add, intf.sw_if_index, ip4_table_index=table_index + ) elif self.af == AF_INET6: r = self.vapi.output_acl_set_interface( - is_add, - intf.sw_if_index, - ip6_table_index=table_index) + is_add, intf.sw_if_index, ip6_table_index=table_index + ) else: r = self.vapi.output_acl_set_interface( - is_add, - intf.sw_if_index, - l2_table_index=table_index) - self.assertIsNotNone(r, 'No response msg for acl_set_interface') + is_add, intf.sw_if_index, l2_table_index=table_index + ) + self.assertIsNotNone(r, "No response msg for acl_set_interface") def config_pbr_fib_entry(self, intf, is_add=1): """Configure fib entry to route traffic toward PBR VRF table @@ -406,10 +466,13 @@ class TestClassifier(VppTestCase): """ addr_len = 24 - self.vapi.ip_add_del_route(dst_address=intf.local_ip4, - dst_address_length=addr_len, - next_hop_address=intf.remote_ip4, - table_id=self.pbr_vrfid, is_add=is_add) + self.vapi.ip_add_del_route( + dst_address=intf.local_ip4, + dst_address_length=addr_len, + next_hop_address=intf.remote_ip4, + table_id=self.pbr_vrfid, + is_add=is_add, + ) def verify_vrf(self, vrf_id): """