X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_classify_l2_acl.py;h=cc3617fb23288c3fcf1383b8bfaf47047258ba81;hb=853cc9f2ad3ee52cbdd891fb09d51c25678baed0;hp=d9557ee7ef2213572e584a4ac76f73f3b18e192c;hpb=059d1d0e01a59769d2c3f3f978f50120d64a6976;p=vpp.git diff --git a/test/test_classify_l2_acl.py b/test/test_classify_l2_acl.py index d9557ee7ef2..cc3617fb232 100644 --- a/test/test_classify_l2_acl.py +++ b/test/test_classify_l2_acl.py @@ -1,12 +1,9 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ Classifier-based L2 ACL Test Case HLD: """ import unittest import random -import binascii -import socket - from scapy.packet import Raw from scapy.data import ETH_P_IP @@ -14,12 +11,13 @@ from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.layers.inet6 import IPv6ExtHdrFragment -from framework import VppTestCase, VppTestRunner +from asfframework import VppTestRunner from util import Host, ppp +from template_classifier import TestClassifier -class TestClassifyAcl(VppTestCase): - """ Classifier-based L2 input and output ACL Test Case """ +class TestClassifyAcl(TestClassifier): + """Classifier-based L2 input and output ACL Test Case""" # traffic types IP = 0 @@ -36,7 +34,7 @@ class TestClassifyAcl(VppTestCase): # supported protocols proto = [[6, 17], [1, 58]] - proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'} + proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"} ICMPv4 = 0 ICMPv6 = 1 TCP = 0 @@ -88,6 +86,7 @@ class TestClassifyAcl(VppTestCase): variables and configure VPP. """ super(TestClassifyAcl, cls).setUpClass() + cls.af = None try: # Create 2 pg interfaces @@ -102,11 +101,13 @@ class TestClassifyAcl(VppTestCase): # Create BD with MAC learning and unknown unicast flooding disabled # and put interfaces to this BD - cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, - learn=1) + cls.vapi.bridge_domain_add_del_v2( + bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1 + ) for pg_if in cls.pg_interfaces: - cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index, - bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id + ) # Set up all interfaces for i in cls.pg_interfaces: @@ -126,138 +127,66 @@ class TestClassifyAcl(VppTestCase): # self.warmup_test() # Holder of the active classify table key - cls.acl_active_table = '' + cls.acl_active_table = "" except Exception: super(TestClassifyAcl, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestClassifyAcl, cls).tearDownClass() + def setUp(self): super(TestClassifyAcl, self).setUp() - self.acl_tbl_idx = {} - self.reset_packet_infos() def tearDown(self): """ Show various debug prints after each test. """ if not self.vpp_dead: - self.logger.info(self.vapi.ppcli("show inacl type l2")) - self.logger.info(self.vapi.ppcli("show outacl type l2")) - self.logger.info(self.vapi.ppcli("show classify tables verbose")) - self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" - % self.bd_id)) - if self.acl_active_table == 'mac_inout': + if self.acl_active_table == "mac_inout": self.output_acl_set_interface( - self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0) + self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' - elif self.acl_active_table == 'mac_out': + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" + elif self.acl_active_table == "mac_out": self.output_acl_set_interface( - self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' - elif self.acl_active_table == 'mac_in': + self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" + elif self.acl_active_table == "mac_in": self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" super(TestClassifyAcl, self).tearDown() - @staticmethod - def build_mac_mask(dst_mac='', src_mac='', ether_type=''): - """Build MAC ACL mask data with hexstring format - - :param str dst_mac: source MAC address <0-ffffffffffff> - :param str src_mac: destination MAC address <0-ffffffffffff> - :param str ether_type: ethernet type <0-ffff> - """ - - return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac, - ether_type)).rstrip('0') - - @staticmethod - def build_mac_match(dst_mac='', src_mac='', ether_type=''): - """Build MAC ACL match data with hexstring format - - :param str dst_mac: source MAC address - :param str src_mac: destination MAC address - :param str ether_type: ethernet type <0-ffff> - """ - if dst_mac: - dst_mac = dst_mac.replace(':', '') - if src_mac: - src_mac = src_mac.replace(':', '') - - return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac, - ether_type)).rstrip('0') - - def create_classify_table(self, key, mask, data_offset=0, is_add=1): - """Create Classify Table - - :param str key: key for classify table (ex, ACL name). - :param str mask: mask value for interested traffic. - :param int match_n_vectors: - :param int is_add: option to configure classify table. - - create(1) or delete(0) - """ - r = self.vapi.classify_add_del_table( - is_add, - binascii.unhexlify(mask), - match_n_vectors=(len(mask) - 1) // 32 + 1, - miss_next_index=0, - current_data_flag=1, - current_data_offset=data_offset) - self.assertIsNotNone(r, msg='No response msg for add_del_table') - self.acl_tbl_idx[key] = r.new_table_index - - def create_classify_session(self, intf, table_index, match, - hit_next_index=0xffffffff, is_add=1): + def create_classify_session( + self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1 + ): """Create Classify Session :param VppInterface intf: Interface to apply classify session. :param int table_index: table index to identify classify table. :param str match: matched value for interested traffic. - :param int pbr_action: enable/disable PBR feature. - :param int vrfid: VRF id. :param int is_add: option to configure classify session. - create(1) or delete(0) """ + mask_match, mask_match_len = self._resolve_mask_match(match) r = self.vapi.classify_add_del_session( - is_add, - table_index, - binascii.unhexlify(match), - hit_next_index=hit_next_index) - self.assertIsNotNone(r, msg='No response msg for add_del_session') - - def input_acl_set_interface(self, intf, table_index, is_add=1): - """Configure Input ACL interface - - :param VppInterface intf: Interface to apply Input ACL feature. - :param int table_index: table index to identify classify table. - :param int is_add: option to configure classify session. - - enable(1) or disable(0) - """ - r = self.vapi.input_acl_set_interface( - is_add, - intf.sw_if_index, - l2_table_index=table_index) - self.assertIsNotNone(r, msg='No response msg for acl_set_interface') - - def output_acl_set_interface(self, intf, table_index, is_add=1): - """Configure Output ACL interface - - :param VppInterface intf: Interface to apply Output ACL feature. - :param int table_index: table index to identify classify table. - :param int is_add: option to configure classify session. - - enable(1) or disable(0) - """ - r = self.vapi.output_acl_set_interface( - is_add, - intf.sw_if_index, - l2_table_index=table_index) - self.assertIsNotNone(r, msg='No response msg for acl_set_interface') + is_add=is_add, + table_index=table_index, + match=mask_match, + match_len=mask_match_len, + hit_next_index=hit_next_index, + ) + self.assertIsNotNone(r, "No response msg for add_del_session") def create_hosts(self, count, start=0): """ @@ -268,44 +197,55 @@ class TestClassifyAcl(VppTestCase): :param int start: Number to start numbering from. """ n_int = len(self.pg_interfaces) - macs_per_if = count / n_int + macs_per_if = count // n_int i = -1 for pg_if in self.pg_interfaces: i += 1 start_nr = macs_per_if * i + start - end_nr = count + start if i == (n_int - 1) \ - else macs_per_if * (i + 1) + start + end_nr = ( + count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start + ) hosts = self.hosts_by_pg_idx[pg_if.sw_if_index] for j in range(start_nr, end_nr): host = Host( "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), "172.17.1%02x.%u" % (pg_if.sw_if_index, j), - "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j), + ) hosts.append(host) def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] - if p == 'UDP': + if p == "UDP": if ports == 0: - return UDP(sport=random.randint(self.udp_sport_from, - self.udp_sport_to), - dport=random.randint(self.udp_dport_from, - self.udp_dport_to)) + return UDP( + sport=random.randint(self.udp_sport_from, self.udp_sport_to), + dport=random.randint(self.udp_dport_from, self.udp_dport_to), + ) else: return UDP(sport=ports, dport=ports) - elif p == 'TCP': + elif p == "TCP": if ports == 0: - return TCP(sport=random.randint(self.tcp_sport_from, - self.tcp_sport_to), - dport=random.randint(self.tcp_dport_from, - self.tcp_dport_to)) + return TCP( + sport=random.randint(self.tcp_sport_from, self.tcp_sport_to), + dport=random.randint(self.tcp_dport_from, self.tcp_dport_to), + ) else: return TCP(sport=ports, dport=ports) - return '' - - def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0, fragments=False, - pkt_raw=True, etype=-1): + return "" + + def create_stream( + self, + src_if, + packet_sizes, + traffic_type=0, + ipv6=0, + proto=-1, + ports=0, + fragments=False, + pkt_raw=True, + etype=-1, + ): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -322,7 +262,7 @@ class TestClassifyAcl(VppTestCase): dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index] n_int = len(dst_hosts) * len(src_hosts) for i in range(0, n_int): - dst_host = dst_hosts[i / len(src_hosts)] + dst_host = dst_hosts[i // len(src_hosts)] src_host = src_hosts[i % len(src_hosts)] pkt_info = self.create_packet_info(src_if, dst_if) if ipv6 == 1: @@ -338,26 +278,25 @@ class TestClassifyAcl(VppTestCase): payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) if etype > 0: - p = Ether(dst=dst_host.mac, - src=src_host.mac, - type=etype) + p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) if fragments: p /= IPv6ExtHdrFragment(offset=64, m=1) else: if fragments: - p /= IP(src=src_host.ip4, dst=dst_host.ip4, - flags=1, frag=64) + p /= IP( + src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64 + ) else: p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: - p /= ICMPv6EchoRequest(type=self.icmp6_type, - code=self.icmp6_code) + p /= ICMPv6EchoRequest( + type=self.icmp6_type, code=self.icmp6_code + ) else: - p /= ICMP(type=self.icmp4_type, - code=self.icmp4_code) + p /= ICMP(type=self.icmp4_type, code=self.icmp4_code) else: p /= self.create_upper_layer(i, pkt_info.proto, ports) if pkt_raw: @@ -369,8 +308,7 @@ class TestClassifyAcl(VppTestCase): pkts.append(p) return pkts - def verify_capture(self, pg_if, capture, - traffic_type=0, ip_type=0, etype=-1): + def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1): """ Verify captured input packet stream for defined interface. @@ -385,22 +323,21 @@ class TestClassifyAcl(VppTestCase): for packet in capture: if etype > 0: if packet[Ether].type != etype: - self.logger.error(ppp("Unexpected ethertype in packet:", - packet)) + self.logger.error(ppp("Unexpected ethertype in packet:", packet)) else: continue try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: - payload_info = self.payload_to_info( - packet[ICMPv6EchoRequest].data) + payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data) payload = packet[ICMPv6EchoRequest] else: - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) payload = packet[self.proto_map[payload_info.proto]] except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise if ip_type != 0: @@ -414,8 +351,9 @@ class TestClassifyAcl(VppTestCase): self.assertEqual(payload.type, self.icmp6_type) self.assertEqual(payload.code, self.icmp6_code) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise else: try: @@ -425,12 +363,13 @@ class TestClassifyAcl(VppTestCase): packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) - self.logger.debug("Got packet on port %s: src=%u (id=%u)" % - (pg_if.name, payload_info.src, - packet_index)) + self.logger.debug( + "Got packet on port %s: src=%u (id=%u)" + % (pg_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -439,29 +378,26 @@ class TestClassifyAcl(VppTestCase): self.assertEqual(ip.src, saved_packet[ip_version].src) self.assertEqual(ip.dst, saved_packet[ip_version].dst) p = self.proto_map[payload_info.proto] - if p == 'TCP': + if p == "TCP": tcp = packet[TCP] - self.assertEqual(tcp.sport, saved_packet[ - TCP].sport) - self.assertEqual(tcp.dport, saved_packet[ - TCP].dport) - elif p == 'UDP': + self.assertEqual(tcp.sport, saved_packet[TCP].sport) + self.assertEqual(tcp.dport, saved_packet[TCP].dport) + elif p == "UDP": udp = packet[UDP] - self.assertEqual(udp.sport, saved_packet[ - UDP].sport) - self.assertEqual(udp.dport, saved_packet[ - UDP].dport) + self.assertEqual(udp.sport, saved_packet[UDP].sport) + self.assertEqual(udp.dport, saved_packet[UDP].dport) except: - self.logger.error(ppp("Unexpected or invalid packet:", - packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i, dst_sw_if_index, last_info[i.sw_if_index]) + i, dst_sw_if_index, last_info[i.sw_if_index] + ) self.assertTrue( remaining_packet is None, - "Port %u: Packet expected from source %u didn't arrive" % - (dst_sw_if_index, i.sw_if_index)) + "Port %u: Packet expected from source %u didn't arrive" + % (dst_sw_if_index, i.sw_if_index), + ) def run_traffic_no_check(self): # Test @@ -476,16 +412,32 @@ class TestClassifyAcl(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, - frags=False, pkt_raw=True, etype=-1): + def run_verify_test( + self, + traffic_type=0, + ip_type=0, + proto=-1, + ports=0, + frags=False, + pkt_raw=True, + etype=-1, + ): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): - pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports, - frags, pkt_raw, etype) + pkts = self.create_stream( + i, + self.pg_if_packet_sizes, + traffic_type, + ip_type, + proto, + ports, + frags, + pkt_raw, + etype, + ) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -500,20 +452,27 @@ class TestClassifyAcl(VppTestCase): if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: capture = dst_if.get_capture(pkts_cnt) - self.logger.info("Verifying capture on interface %s" % - dst_if.name) - self.verify_capture(dst_if, capture, - traffic_type, ip_type, etype) + self.logger.info("Verifying capture on interface %s" % dst_if.name) + self.verify_capture(dst_if, capture, traffic_type, ip_type, etype) - def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0, frags=False, etype=-1): + def run_verify_negat_test( + self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1 + ): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): - pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports, - frags, True, etype) + pkts = self.create_stream( + i, + self.pg_if_packet_sizes, + traffic_type, + ip_type, + proto, + ports, + frags, + True, + etype, + ) if len(pkts) > 0: i.add_stream(pkts) @@ -526,101 +485,110 @@ class TestClassifyAcl(VppTestCase): for src_if in self.pg_interfaces: if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: - self.logger.info("Verifying capture on interface %s" % - dst_if.name) + self.logger.info("Verifying capture on interface %s" % dst_if.name) capture = dst_if.get_capture(0) self.assertEqual(len(capture), 0) - def build_classify_table(self, src_mac='', dst_mac='', ether_type='', - etype='', key='mac', hit_next_index=0xffffffff): + def build_classify_table( + self, + src_mac="", + dst_mac="", + ether_type="", + etype="", + key="mac", + hit_next_index=0xFFFFFFFF, + ): # Basic ACL testing - a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac, - ether_type=ether_type) + a_mask = self.build_mac_mask( + src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type + ) self.create_classify_table(key, a_mask) for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]: - s_mac = host.mac if src_mac else '' + s_mac = host.mac if src_mac else "" if dst_mac: for dst_if in self.flows[self.pg0]: for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]: self.create_classify_session( - self.pg0, self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=s_mac, - dst_mac=dst_host.mac, - ether_type=etype), - hit_next_index=hit_next_index) + self.pg0, + self.acl_tbl_idx.get(key), + self.build_mac_match( + src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype + ), + hit_next_index=hit_next_index, + ) else: self.create_classify_session( - self.pg0, self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=s_mac, dst_mac='', - ether_type=etype), - hit_next_index=hit_next_index) + self.pg0, + self.acl_tbl_idx.get(key), + self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype), + hit_next_index=hit_next_index, + ) def test_0000_warmup_test(self): - """ Learn the MAC addresses - """ + """Learn the MAC addresses""" self.create_hosts(2) self.run_traffic_no_check() def test_0010_inacl_permit_src_mac(self): - """ Input L2 ACL test - permit source MAC + """Input L2 ACL test - permit source MAC Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table(src_mac='ffffffffffff', key=key) + key = "mac_in" + self.build_classify_table(src_mac="ffffffffffff", key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0011_inacl_permit_dst_mac(self): - """ Input L2 ACL test - permit destination MAC + """Input L2 ACL test - permit destination MAC Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with destination MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table(dst_mac='ffffffffffff', key=key) + key = "mac_in" + self.build_classify_table(dst_mac="ffffffffffff", key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0012_inacl_permit_src_dst_mac(self): - """ Input L2 ACL test - permit source and destination MAC + """Input L2 ACL test - permit source and destination MAC Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source and destination MAC addresses. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' + key = "mac_in" self.build_classify_table( - src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key) + src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0013_inacl_permit_ether_type(self): - """ Input L2 ACL test - permit ether_type + """Input L2 ACL test - permit ether_type Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with destination MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table( - ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key) + key = "mac_in" + self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0015_inacl_deny(self): - """ Input L2 ACL test - deny + """Input L2 ACL test - deny Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -628,56 +596,55 @@ class TestClassifyAcl(VppTestCase): - Create ACL with source MAC address. - Send and verify no received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table( - src_mac='ffffffffffff', hit_next_index=0, key=key) + key = "mac_in" + self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_negat_test(self.IP, self.IPV4, -1) def test_0020_outacl_permit(self): - """ Output L2 ACL test - permit + """Output L2 ACL test - permit Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_out' - self.build_classify_table(src_mac='ffffffffffff', key=key) + key = "mac_out" + self.build_classify_table(src_mac="ffffffffffff", key=key) self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0025_outacl_deny(self): - """ Output L2 ACL test - deny + """Output L2 ACL test - deny Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source MAC address. - Send and verify no received packets on pg1 interface. """ - key = 'mac_out' - self.build_classify_table( - src_mac='ffffffffffff', hit_next_index=0, key=key) + key = "mac_out" + self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key) self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_negat_test(self.IP, self.IPV4, -1) def test_0030_inoutacl_permit(self): - """ Input+Output L2 ACL test - permit + """Input+Output L2 ACL test - permit Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACLs with source MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_inout' - self.build_classify_table(src_mac='ffffffffffff', key=key) + key = "mac_inout" + self.build_classify_table(src_mac="ffffffffffff", key=key) self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)