X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_classifier.py;h=74851f92377d1d60c5e43734145f8d06a542f9e9;hb=0c4931cb351929a1ccdb6b29431def3705f101d7;hp=11c0985f4d449f280ef493252c7f369c8e85eb55;hpb=eddd8e3588561039985b27edf059db6033bfdfab;p=vpp.git diff --git a/test/test_classifier.py b/test/test_classifier.py index 11c0985f4d4..74851f92377 100644 --- a/test/test_classifier.py +++ b/test/test_classifier.py @@ -5,14 +5,15 @@ import socket import unittest from framework import VppTestCase, VppTestRunner +from scapy.packet import Raw, Packet -from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, TCP from util import ppp -from template_classifier import TestClassifier +from template_classifier import TestClassifier, VarMask, VarMatch from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip import INVALID_INDEX +from vpp_papi import VppEnum # Tests split to different test case classes because of issue reported in @@ -504,6 +505,284 @@ class TestClassifierMAC(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") +class TestClassifierComplex(TestClassifier): + """ Large & Nested Classifiers Test Cases """ + + @classmethod + def setUpClass(cls): + super(TestClassifierComplex, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierComplex, cls).tearDownClass() + + def test_iacl_large(self): + """ Large input ACL test + + Test scenario for Large ACL matching on ethernet+ip+udp headers + - Create IPv4 stream for pg0 -> pg1 interface. + - Create large acl matching on ethernet+ip+udp header fields + - Send and verify received packets on pg1 interface. + """ + + # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b) + # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum + msk = VarMask(offset=40, spec='ffff') + mth = VarMatch(offset=40, value=0x1234, length=2) + + payload_msk = self.build_payload_mask([msk]) + payload_match = self.build_payload_match([mth]) + + sport = 13720 + dport = 9080 + + # 36b offset = 80bytes - (sizeof(UDP/IP/ETH)) + packet_ex = bytes.fromhex(('0' * 36) + '1234') + pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport), + packet_ex) + self.pg0.add_stream(pkts) + + key = 'large_in' + self.create_classify_table( + key, + self.build_mac_mask(src_mac='ffffffffffff', + dst_mac='ffffffffffff', + ether_type='ffff') + + self.build_ip_mask(proto='ff', + src_ip='ffffffff', + dst_ip='ffffffff', + src_port='ffff', + dst_port='ffff') + + payload_msk, + data_offset=-14) + + self.create_classify_session( + self.acl_tbl_idx.get(key), + self.build_mac_match(src_mac=self.pg0.remote_mac, + dst_mac=self.pg0.local_mac, + # ipv4 next header + ether_type='0800') + + self.build_ip_match(proto=socket.IPPROTO_UDP, + src_ip=self.pg0.remote_ip4, + dst_ip=self.pg1.remote_ip4, + src_port=sport, + dst_port=dport) + + payload_match + ) + + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + pkts = self.pg1.get_capture(len(pkts)) + self.verify_capture(self.pg1, pkts) + self.pg0.assert_nothing_captured(remark="packets forwarded") + self.pg2.assert_nothing_captured(remark="packets forwarded") + self.pg3.assert_nothing_captured(remark="packets forwarded") + + def test_oacl_large(self): + """ Large output ACL test + Test scenario for Large ACL matching on ethernet+ip+udp headers + - Create IPv4 stream for pg1 -> pg0 interface. + - Create large acl matching on ethernet+ip+udp header fields + - Send and verify received packets on pg0 interface. + """ + + # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b) + # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum + msk = VarMask(offset=40, spec='ffff') + mth = VarMatch(offset=40, value=0x1234, length=2) + + payload_msk = self.build_payload_mask([msk]) + payload_match = self.build_payload_match([mth]) + + sport = 13720 + dport = 9080 + + # 36b offset = 80bytes - (sizeof(UDP/IP/ETH)) + packet_ex = bytes.fromhex(('0' * 36) + '1234') + pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport), + packet_ex) + self.pg1.add_stream(pkts) + + key = 'large_out' + self.create_classify_table( + key, + self.build_mac_mask(src_mac='ffffffffffff', + dst_mac='ffffffffffff', + ether_type='ffff') + + self.build_ip_mask(proto='ff', + src_ip='ffffffff', + dst_ip='ffffffff', + src_port='ffff', + dst_port='ffff') + + payload_msk, + data_offset=-14) + + self.create_classify_session( + self.acl_tbl_idx.get(key), + self.build_mac_match(src_mac=self.pg0.local_mac, + dst_mac=self.pg0.remote_mac, + # ipv4 next header + ether_type='0800') + + self.build_ip_match(proto=socket.IPPROTO_UDP, + src_ip=self.pg1.remote_ip4, + dst_ip=self.pg0.remote_ip4, + src_port=sport, + dst_port=dport) + + payload_match) + + self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + pkts = self.pg0.get_capture(len(pkts)) + self.verify_capture(self.pg0, pkts) + self.pg1.assert_nothing_captured(remark="packets forwarded") + self.pg2.assert_nothing_captured(remark="packets forwarded") + self.pg3.assert_nothing_captured(remark="packets forwarded") + + def test_iacl_nested(self): + """ Nested input ACL test + + Test scenario for Large ACL matching on ethernet+ip+udp headers + - Create IPv4 stream for pg0 -> pg1 interface. + - Create 1st classifier table, without any entries + - Create nested acl matching on ethernet+ip+udp header fields + - Send and verify received packets on pg1 interface. + """ + + sport = 13720 + dport = 9080 + pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport)) + + self.pg0.add_stream(pkts) + + subtable_key = 'subtable_in' + self.create_classify_table( + subtable_key, + self.build_mac_mask(src_mac='ffffffffffff', + dst_mac='ffffffffffff', + ether_type='ffff') + + self.build_ip_mask(proto='ff', + src_ip='ffffffff', + dst_ip='ffffffff', + src_port='ffff', + dst_port='ffff'), + data_offset=-14) + + key = 'nested_in' + self.create_classify_table( + key, + self.build_mac_mask(src_mac='ffffffffffff', + dst_mac='ffffffffffff', + ether_type='ffff') + + self.build_ip_mask(proto='ff', + src_ip='ffffffff', + dst_ip='ffffffff', + src_port='ffff', + dst_port='ffff'), + next_table_index=self.acl_tbl_idx.get(subtable_key)) + + self.create_classify_session( + self.acl_tbl_idx.get(subtable_key), + self.build_mac_match(src_mac=self.pg0.remote_mac, + dst_mac=self.pg0.local_mac, + # ipv4 next header + ether_type='0800') + + self.build_ip_match(proto=socket.IPPROTO_UDP, + src_ip=self.pg0.remote_ip4, + dst_ip=self.pg1.remote_ip4, + src_port=sport, + dst_port=dport)) + + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + pkts = self.pg1.get_capture(len(pkts)) + self.verify_capture(self.pg1, pkts) + self.pg0.assert_nothing_captured(remark="packets forwarded") + self.pg2.assert_nothing_captured(remark="packets forwarded") + self.pg3.assert_nothing_captured(remark="packets forwarded") + + def test_oacl_nested(self): + """ Nested output ACL test + + Test scenario for Large ACL matching on ethernet+ip+udp headers + - Create IPv4 stream for pg1 -> pg0 interface. + - Create 1st classifier table, without any entries + - Create nested acl matching on ethernet+ip+udp header fields + - Send and verify received packets on pg0 interface. + """ + + sport = 13720 + dport = 9080 + pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport)) + self.pg1.add_stream(pkts) + + subtable_key = 'subtable_out' + self.create_classify_table( + subtable_key, + self.build_mac_mask(src_mac='ffffffffffff', + dst_mac='ffffffffffff', + ether_type='ffff') + + self.build_ip_mask(proto='ff', + src_ip='ffffffff', + dst_ip='ffffffff', + src_port='ffff', + dst_port='ffff'), + data_offset=-14) + + key = 'nested_out' + self.create_classify_table( + key, + self.build_mac_mask(src_mac='ffffffffffff', + dst_mac='ffffffffffff', + ether_type='ffff') + + self.build_ip_mask(proto='ff', + src_ip='ffffffff', + dst_ip='ffffffff', + src_port='ffff', + dst_port='ffff'), + next_table_index=self.acl_tbl_idx.get(subtable_key), + data_offset=-14) + + self.create_classify_session( + self.acl_tbl_idx.get(subtable_key), + self.build_mac_match(src_mac=self.pg0.local_mac, + dst_mac=self.pg0.remote_mac, + # ipv4 next header + ether_type='0800') + + self.build_ip_match(proto=socket.IPPROTO_UDP, + src_ip=self.pg1.remote_ip4, + dst_ip=self.pg0.remote_ip4, + src_port=sport, + dst_port=dport)) + + self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + pkts = self.pg0.get_capture(len(pkts)) + self.verify_capture(self.pg0, pkts) + self.pg1.assert_nothing_captured(remark="packets forwarded") + self.pg2.assert_nothing_captured(remark="packets forwarded") + self.pg3.assert_nothing_captured(remark="packets forwarded") + + class TestClassifierPBR(TestClassifier): """ Classifier PBR Test Case """ @@ -565,5 +844,81 @@ class TestClassifierPBR(TestClassifier): # and the table should be gone. self.assertFalse(self.verify_vrf(self.pbr_vrfid)) + +class TestClassifierPunt(TestClassifier): + """ Classifier punt Test Case """ + + @classmethod + def setUpClass(cls): + super(TestClassifierPunt, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierPunt, cls).tearDownClass() + + def test_punt_udp(self): + """ IPv4/UDP protocol punt ACL test + + Test scenario for basic punt ACL with UDP protocol + - Create IPv4 stream for pg0 -> pg1 interface. + - Create punt ACL with UDP IP protocol. + - Send and verify received packets on pg1 interface. + """ + + sport = 6754 + dport = 17923 + + key = 'ip4_udp_punt' + self.create_classify_table( + key, + self.build_ip_mask( + src_ip='ffffffff', + proto='ff', + src_port='ffff')) + table_index = self.acl_tbl_idx.get(key) + self.vapi.punt_acl_add_del(ip4_table_index=table_index) + self.acl_active_table = key + + # punt udp packets to dport received on pg0 through pg1 + self.vapi.set_punt( + is_add=1, + punt={ + 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4, + 'punt': { + 'l4': { + 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4, + 'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP, + 'port': dport, + }}}) + self.vapi.ip_punt_redirect(punt={ + 'rx_sw_if_index': self.pg0.sw_if_index, + 'tx_sw_if_index': self.pg1.sw_if_index, + 'nh': self.pg1.remote_ip4, + }) + + pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / + UDP(sport=sport, dport=dport) / + Raw('\x17' * 100))] * 2 + + # allow a session but not matching the stream: expect to drop + self.create_classify_session( + table_index, + self.build_ip_match(src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, src_port=sport + 10)) + self.send_and_assert_no_replies(self.pg0, pkts) + + # allow a session matching the stream: expect to pass + self.create_classify_session( + table_index, + self.build_ip_match(src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, src_port=sport)) + self.send_and_expect_only(self.pg0, pkts, self.pg1) + + # cleanup + self.acl_active_table = '' + self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)