X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_classifier.py;h=7ce69d436f5072084560197d7385e9deceb33d0f;hb=b59f63b0e71c879331280b721ec72ff0b4699540;hp=49cb3b369c3d14bf63739a84e2878592de3fa6e4;hpb=692bfc85f2c41c6e25e73931a7b6fe6a1c5dd6c6;p=vpp.git diff --git a/test/template_classifier.py b/test/template_classifier.py index 49cb3b369c3..7ce69d436f5 100644 --- a/test/template_classifier.py +++ b/test/template_classifier.py @@ -1,10 +1,11 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import binascii import socket from socket import AF_INET, AF_INET6 import unittest import sys +from dataclasses import dataclass from framework import VppTestCase @@ -15,6 +16,19 @@ from scapy.layers.inet6 import IPv6 from util import ppp +@dataclass +class VarMask: + offset: int + spec: str + + +@dataclass +class VarMatch: + offset: int + value: int + length: int + + class TestClassifier(VppTestCase): @staticmethod @@ -97,19 +111,15 @@ class TestClassifier(VppTestCase): self.logger.info(self.vapi.cli("show classify table verbose")) self.logger.info(self.vapi.cli("show ip fib")) + self.logger.info(self.vapi.cli("show error")) - acl_active_table = 'ip_out' - if self.af == AF_INET6: - acl_active_table = 'ip6_out' - - if self.acl_active_table == acl_active_table: + if self.acl_active_table.endswith('out'): self.output_acl_set_interface( self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' elif self.acl_active_table != '': self.input_acl_set_interface( self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' + self.acl_active_table = '' for intf in self.interfaces: if self.af == AF_INET: @@ -134,7 +144,7 @@ class TestClassifier(VppTestCase): src_mac = src_mac.replace(':', '') return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( - dst_mac, src_mac, ether_type)).rstrip('0') + dst_mac, src_mac, ether_type)).rstrip() @staticmethod def build_mac_mask(dst_mac='', src_mac='', ether_type=''): @@ -146,7 +156,7 @@ class TestClassifier(VppTestCase): """ return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( - dst_mac, src_mac, ether_type)).rstrip('0') + dst_mac, src_mac, ether_type)).rstrip() @staticmethod def build_ip_mask(proto='', src_ip='', dst_ip='', @@ -178,6 +188,18 @@ class TestClassifier(VppTestCase): return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format( nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0') + @staticmethod + def build_payload_mask(masks): + payload_mask = '' + + for mask in masks: + # offset is specified in bytes, convert to hex format. + length = (mask.offset * 2) + len(mask.spec) + format_spec = '{!s:0>' + str(length) + '}' + payload_mask += format_spec.format(mask.spec) + + return payload_mask.rstrip('0') + @staticmethod def build_ip_match(proto=0, src_ip='', dst_ip='', src_port=0, dst_port=0): @@ -228,8 +250,23 @@ class TestClassifier(VppTestCase): hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:])).rstrip('0') + @staticmethod + def build_payload_match(matches): + payload_match = '' + + for match in matches: + sval = str(hex(match.value)[2:]) + # offset is specified in bytes, convert to hex format. + length = (match.offset + match.length) * 2 + + format_spec = '{!s:0>' + str(length) + '}' + payload_match += format_spec.format(sval) + + return payload_match.rstrip('0') + def create_stream(self, src_if, dst_if, packet_sizes, - proto_l=UDP(sport=1234, dport=5678)): + proto_l=UDP(sport=1234, dport=5678), + payload_ex=None): """Create input packet stream for defined interfaces. :param VppInterface src_if: Source Interface for packet stream. @@ -242,6 +279,11 @@ class TestClassifier(VppTestCase): for size in packet_sizes: info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) + + # append any additional payload after info + if payload_ex is not None: + payload += payload_ex + if self.af == AF_INET: p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / @@ -306,12 +348,14 @@ class TestClassifier(VppTestCase): "Interface %s: Packet expected from interface %s " "didn't arrive" % (dst_if.name, i.name)) - def create_classify_table(self, key, mask, data_offset=0): + def create_classify_table(self, key, mask, data_offset=0, + next_table_index=None): """Create Classify Table :param str key: key for classify table (ex, ACL name). :param str mask: mask value for interested traffic. :param int data_offset: + :param str next_table_index """ mask_match, mask_match_len = self._resolve_mask_match(mask) r = self.vapi.classify_add_del_table( @@ -321,7 +365,8 @@ class TestClassifier(VppTestCase): match_n_vectors=(len(mask) - 1) // 32 + 1, miss_next_index=0, current_data_flag=1, - current_data_offset=data_offset) + current_data_offset=data_offset, + next_table_index=next_table_index) self.assertIsNotNone(r, 'No response msg for add_del_table') self.acl_tbl_idx[key] = r.new_table_index @@ -406,9 +451,9 @@ class TestClassifier(VppTestCase): """ addr_len = 24 - self.vapi.ip_add_del_route(dst_address=intf.local_ip4n, + self.vapi.ip_add_del_route(dst_address=intf.local_ip4, dst_address_length=addr_len, - next_hop_address=intf.remote_ip4n, + next_hop_address=intf.remote_ip4, table_id=self.pbr_vrfid, is_add=is_add) def verify_vrf(self, vrf_id):