from scapy.packet import Raw
+from scapy.data import ETH_P_IP
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
learn=1)
for pg_if in cls.pg_interfaces:
- cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
- bd_id=cls.bd_id)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
# Set up all interfaces
for i in cls.pg_interfaces:
# warm-up the mac address tables
# self.warmup_test()
+ # Holder of the active classify table key
+ cls.acl_active_table = ''
+
except Exception:
super(TestClassifyAcl, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifyAcl, cls).tearDownClass()
+
def setUp(self):
super(TestClassifyAcl, self).setUp()
"""
Show various debug prints after each test.
"""
- super(TestClassifyAcl, self).tearDown()
if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show inacl type l2"))
- self.logger.info(self.vapi.ppcli("show outacl type l2"))
- self.logger.info(self.vapi.ppcli("show classify tables verbose"))
- self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
- % self.bd_id))
+ if self.acl_active_table == 'mac_inout':
+ self.output_acl_set_interface(
+ self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
+ self.input_acl_set_interface(
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
+ self.acl_active_table = ''
+ elif self.acl_active_table == 'mac_out':
+ self.output_acl_set_interface(
+ self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
+ self.acl_active_table = ''
+ elif self.acl_active_table == 'mac_in':
+ self.input_acl_set_interface(
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
+ self.acl_active_table = ''
- @staticmethod
- def build_ip_mask(proto='', src_ip='', dst_ip='',
- src_port='', dst_port=''):
- """Build IP ACL mask data with hexstring format
-
- :param str proto: protocol number <0-ff>
- :param str src_ip: source ip address <0-ffffffff>
- :param str dst_ip: destination ip address <0-ffffffff>
- :param str src_port: source port number <0-ffff>
- :param str dst_port: destination port number <0-ffff>
- """
-
- return ('{:0>20}{:0>12}{:0>8}{:0>12}{:0>4}'.format(
- proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
-
- @staticmethod
- def build_ip_match(proto='', src_ip='', dst_ip='',
- src_port='', dst_port=''):
- """Build IP ACL match data with hexstring format
-
- :param str proto: protocol number with valid option "<0-ff>"
- :param str src_ip: source ip address with format of "x.x.x.x"
- :param str dst_ip: destination ip address with format of "x.x.x.x"
- :param str src_port: source port number <0-ffff>
- :param str dst_port: destination port number <0-ffff>
- """
- if src_ip:
- src_ip = socket.inet_aton(src_ip).encode('hex')
- if dst_ip:
- dst_ip = socket.inet_aton(dst_ip).encode('hex')
+ super(TestClassifyAcl, self).tearDown()
- return ('{:0>20}{:0>12}{:0>8}{:0>12}{:0>4}'.format(
- proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.ppcli("show inacl type l2"))
+ self.logger.info(self.vapi.ppcli("show outacl type l2"))
+ self.logger.info(self.vapi.ppcli("show classify tables verbose"))
+ self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
+ % self.bd_id))
@staticmethod
def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
:param str ether_type: ethernet type <0-ffff>
"""
- return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
- ether_type)).rstrip('0')
+ return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
+ dst_mac, src_mac, ether_type)).rstrip('0')
@staticmethod
def build_mac_match(dst_mac='', src_mac='', ether_type=''):
if src_mac:
src_mac = src_mac.replace(':', '')
- return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
- ether_type)).rstrip('0')
+ return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
+ dst_mac, src_mac, ether_type)).rstrip('0')
def create_classify_table(self, key, mask, data_offset=0, is_add=1):
"""Create Classify Table
miss_next_index=0,
current_data_flag=1,
current_data_offset=data_offset)
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
self.acl_tbl_idx[key] = r.new_table_index
def create_classify_session(self, intf, table_index, match,
table_index,
binascii.unhexlify(match),
hit_next_index=hit_next_index)
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
def input_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Input ACL interface
is_add,
intf.sw_if_index,
l2_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
def output_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Output ACL interface
is_add,
intf.sw_if_index,
l2_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
def create_hosts(self, count, start=0):
"""
packet[ICMPv6EchoRequest].data)
payload = packet[ICMPv6EchoRequest]
else:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
payload = packet[self.proto_map[payload_info.proto]]
except:
self.logger.error(ppp("Unexpected or invalid packet "
capture = dst_if.get_capture(0)
self.assertEqual(len(capture), 0)
- def build_classify_table(self, hit_next_index=0xffffffff):
- # Basic ACL testing with source MAC
- a_mask = self.build_mac_mask(src_mac='ffffffffffff')
- self.create_classify_table('ip', a_mask)
+ def build_classify_table(self, src_mac='', dst_mac='', ether_type='',
+ etype='', key='mac', hit_next_index=0xffffffff):
+ # Basic ACL testing
+ a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac,
+ ether_type=ether_type)
+ self.create_classify_table(key, a_mask)
for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
- self.create_classify_session(
- self.pg0, self.acl_tbl_idx.get('ip'),
- self.build_mac_match(src_mac=host.mac),
- hit_next_index=hit_next_index)
+ s_mac = host.mac if src_mac else ''
+ if dst_mac:
+ for dst_if in self.flows[self.pg0]:
+ for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
+ self.create_classify_session(
+ self.pg0, self.acl_tbl_idx.get(key),
+ self.build_mac_match(src_mac=s_mac,
+ dst_mac=dst_host.mac,
+ ether_type=etype),
+ hit_next_index=hit_next_index)
+ else:
+ self.create_classify_session(
+ self.pg0, self.acl_tbl_idx.get(key),
+ self.build_mac_match(src_mac=s_mac, dst_mac='',
+ ether_type=etype),
+ hit_next_index=hit_next_index)
def test_0000_warmup_test(self):
""" Learn the MAC addresses
self.create_hosts(2)
self.run_traffic_no_check()
- def test_0010_inacl_permit(self):
- """ Input L2 ACL test - permit
+ def test_0010_inacl_permit_src_mac(self):
+ """ Input L2 ACL test - permit source MAC
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACL with source MAC address.
- Send and verify received packets on pg1 interface.
"""
- self.build_classify_table()
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+ key = 'mac_in'
+ self.build_classify_table(src_mac='ffffffffffff', key=key)
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
+ self.run_verify_test(self.IP, self.IPV4, -1)
+
+ def test_0011_inacl_permit_dst_mac(self):
+ """ Input L2 ACL test - permit destination MAC
+
+ Test scenario for basic IP ACL with source IP
+ - Create IPv4 stream for pg0 -> pg1 interface.
+ - Create ACL with destination MAC address.
+ - Send and verify received packets on pg1 interface.
+ """
+ key = 'mac_in'
+ self.build_classify_table(dst_mac='ffffffffffff', key=key)
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
+ self.run_verify_test(self.IP, self.IPV4, -1)
+
+ def test_0012_inacl_permit_src_dst_mac(self):
+ """ Input L2 ACL test - permit source and destination MAC
+
+ Test scenario for basic IP ACL with source IP
+ - Create IPv4 stream for pg0 -> pg1 interface.
+ - Create ACL with source and destination MAC addresses.
+ - Send and verify received packets on pg1 interface.
+ """
+ key = 'mac_in'
+ self.build_classify_table(
+ src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key)
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
+ self.run_verify_test(self.IP, self.IPV4, -1)
+
+ def test_0013_inacl_permit_ether_type(self):
+ """ Input L2 ACL test - permit ether_type
+
+ Test scenario for basic IP ACL with source IP
+ - Create IPv4 stream for pg0 -> pg1 interface.
+ - Create ACL with destination MAC address.
+ - Send and verify received packets on pg1 interface.
+ """
+ key = 'mac_in'
+ self.build_classify_table(
+ ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key)
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
def test_0015_inacl_deny(self):
""" Input L2 ACL test - deny
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
+
- Create ACL with source MAC address.
- Send and verify no received packets on pg1 interface.
"""
- self.build_classify_table(hit_next_index=0)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+ key = 'mac_in'
+ self.build_classify_table(
+ src_mac='ffffffffffff', hit_next_index=0, key=key)
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
self.run_verify_negat_test(self.IP, self.IPV4, -1)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
def test_0020_outacl_permit(self):
""" Output L2 ACL test - permit
- Create ACL with source MAC address.
- Send and verify received packets on pg1 interface.
"""
- self.build_classify_table()
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
+ key = 'mac_out'
+ self.build_classify_table(src_mac='ffffffffffff', key=key)
+ self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
def test_0025_outacl_deny(self):
""" Output L2 ACL test - deny
- Create ACL with source MAC address.
- Send and verify no received packets on pg1 interface.
"""
- self.build_classify_table(hit_next_index=0)
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
+ key = 'mac_out'
+ self.build_classify_table(
+ src_mac='ffffffffffff', hit_next_index=0, key=key)
+ self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
self.run_verify_negat_test(self.IP, self.IPV4, -1)
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
def test_0030_inoutacl_permit(self):
""" Input+Output L2 ACL test - permit
- Create ACLs with source MAC address.
- Send and verify received packets on pg1 interface.
"""
- self.build_classify_table()
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'))
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'))
+ key = 'mac_inout'
+ self.build_classify_table(src_mac='ffffffffffff', key=key)
+ self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get('ip'), 0)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('ip'), 0)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)