from framework import VppTestCase, VppTestRunner
from util import Host, ppp
+from vpp_lo_interface import VppLoInterface
+
class TestACLplugin(VppTestCase):
""" ACL plugin Test Case """
acls=[reply.acl_index])
return
+ def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+ tag=tag)
+ self.logger.info("Dumped ACL: " + str(
+ self.vapi.acl_dump(reply.acl_index)))
+ # Apply a ACL on the interface as inbound
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=1,
+ acls=[reply.acl_index])
+ return
+
+ def etype_whitelist(self, whitelist, n_input):
+ # Apply whitelists on all the interfaces
+ for i in self.pg_interfaces:
+ # checkstyle can't read long names. Help them.
+ fun = self.vapi.acl_interface_set_etype_whitelist
+ fun(sw_if_index=i.sw_if_index, n_input=n_input,
+ whitelist=whitelist)
+ return
+
def create_upper_layer(self, packet_index, proto, ports=0):
p = self.proto_map[proto]
if p == 'UDP':
return ''
def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0, fragments=False, pkt_raw=True):
+ proto=-1, ports=0, fragments=False,
+ pkt_raw=True, etype=-1):
"""
Create input packet stream for defined interface using hosts or
deleted_hosts list.
pkt_info.proto = proto
payload = self.info_to_payload(pkt_info)
p = Ether(dst=dst_host.mac, src=src_host.mac)
+ if etype > 0:
+ p = Ether(dst=dst_host.mac,
+ src=src_host.mac,
+ type=etype)
if pkt_info.ip:
p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
if fragments:
pkts.append(p)
return pkts
- def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
+ def verify_capture(self, pg_if, capture,
+ traffic_type=0, ip_type=0, etype=-1):
"""
Verify captured input packet stream for defined interface.
last_info[i.sw_if_index] = None
dst_sw_if_index = pg_if.sw_if_index
for packet in capture:
+ if etype > 0:
+ if packet[Ether].type != etype:
+ self.logger.error(ppp("Unexpected ethertype in packet:",
+ packet))
+ else:
+ continue
try:
# Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
if traffic_type == self.ICMP and ip_type == self.IPV6:
self.pg_start()
def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
- frags=False, pkt_raw=True):
+ frags=False, pkt_raw=True, etype=-1):
# Test
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
if self.flows.__contains__(i):
pkts = self.create_stream(i, self.pg_if_packet_sizes,
traffic_type, ip_type, proto, ports,
- frags, pkt_raw)
+ frags, pkt_raw, etype)
if len(pkts) > 0:
i.add_stream(pkts)
pkts_cnt += len(pkts)
capture = dst_if.get_capture(pkts_cnt)
self.logger.info("Verifying capture on interface %s" %
dst_if.name)
- self.verify_capture(dst_if, capture, traffic_type, ip_type)
+ self.verify_capture(dst_if, capture,
+ traffic_type, ip_type, etype)
def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
- ports=0, frags=False):
+ ports=0, frags=False, etype=-1):
# Test
self.reset_packet_infos()
for i in self.pg_interfaces:
if self.flows.__contains__(i):
pkts = self.create_stream(i, self.pg_if_packet_sizes,
traffic_type, ip_type, proto, ports,
- frags)
+ frags, True, etype)
if len(pkts) > 0:
i.add_stream(pkts)
self.logger.info("ACLP_TEST_FINISH_0113")
+ def test_0300_tcp_permit_v4_etype_aaaa(self):
+ """ permit TCPv4, send 0xAAAA etype
+ """
+ self.logger.info("ACLP_TEST_START_0300")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit ipv4 tcp")
+
+ # Traffic should still pass
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+ # Traffic should still pass also for an odd ethertype
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+ 0, False, True, 0xaaaa)
+
+ self.logger.info("ACLP_TEST_FINISH_0300")
+
+ def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
+ """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
+ """
+ self.logger.info("ACLP_TEST_START_0305")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit ipv4 tcp")
+
+ # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+ self.etype_whitelist([0xbbb], 1)
+
+ # The IPv4 traffic should still pass
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+ # The oddball ethertype should be blocked
+ self.run_verify_negat_test(self.IP, self.IPV4,
+ self.proto[self.IP][self.TCP],
+ 0, False, 0xaaaa)
+
+ # The whitelisted traffic, on the other hand, should pass
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+ 0, False, True, 0x0bbb)
+
+ # remove the whitelist, the previously blocked 0xAAAA should pass now
+ self.etype_whitelist([], 0)
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+ 0, False, True, 0xaaaa)
+
+ self.logger.info("ACLP_TEST_FINISH_0305")
+
+ def test_0315_del_intf(self):
+ """ apply an acl and delete the interface
+ """
+ self.logger.info("ACLP_TEST_START_0315")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # create an interface
+ intf = []
+ intf.append(VppLoInterface(self, 0))
+
+ # Apply rules
+ self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
+
+ # Remove the interface
+ intf[0].remove_vpp_config()
+
+ self.logger.info("ACLP_TEST_FINISH_0315")
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)