-#!/usr/bin/env python
+#!/usr/bin/env python3
""" ACL plugin extended stateful tests """
import unittest
-from framework import VppTestCase, VppTestRunner, running_extended_tests
+from config import config
+from framework import VppTestCase, VppTestRunner
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP, TCP
from pprint import pprint
from random import randint
from util import L4_Conn
+from ipaddress import ip_network
+
+from vpp_acl import AclRule, VppAcl, VppAclInterface
def to_acl_rule(self, is_permit, wildcard_sport=False):
rule_l4_sport_first = rule_l4_sport
rule_l4_sport_last = rule_l4_sport
- new_rule = {
- 'is_permit': is_permit,
- 'is_ipv6': p.haslayer(IPv6),
- 'src_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].src),
- 'src_ip_prefix_len': rule_prefix_len,
- 'dst_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].dst),
- 'dst_ip_prefix_len': rule_prefix_len,
- 'srcport_or_icmptype_first': rule_l4_sport_first,
- 'srcport_or_icmptype_last': rule_l4_sport_last,
- 'dstport_or_icmpcode_first': rule_l4_dport,
- 'dstport_or_icmpcode_last': rule_l4_dport,
- 'proto': rule_l4_proto,
- }
+ new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
+ src_prefix=ip_network(
+ (p[rule_l3_layer].src, rule_prefix_len)),
+ dst_prefix=ip_network(
+ (p[rule_l3_layer].dst, rule_prefix_len)),
+ sport_from=rule_l4_sport_first,
+ sport_to=rule_l4_sport_last,
+ dport_from=rule_l4_dport, dport_to=rule_l4_dport)
+
return new_rule
+
Packet.to_acl_rule = to_acl_rule
r = []
r.append(pkt.to_acl_rule(2, wildcard_sport=True))
r.append(self.wildcard_rule(0))
- res = self.testcase.vapi.acl_add_replace(0xffffffff, r)
- self.testcase.assert_equal(res.retval, 0, "error adding ACL")
- reflect_acl_index = res.acl_index
+ reflect_acl = VppAcl(self.testcase, r)
+ reflect_acl.add_vpp_config()
r = []
r.append(self.wildcard_rule(0))
- res = self.testcase.vapi.acl_add_replace(0xffffffff, r)
- self.testcase.assert_equal(res.retval, 0, "error adding deny ACL")
- deny_acl_index = res.acl_index
+ deny_acl = VppAcl(self.testcase, r)
+ deny_acl.add_vpp_config()
if reflect_side == acl_side:
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[acl_side].sw_if_index, 1,
- [reflect_acl_index,
- deny_acl_index])
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[1-acl_side].sw_if_index, 0, [])
+ acl_if0 = VppAclInterface(self.testcase,
+ self.ifs[acl_side].sw_if_index,
+ [reflect_acl, deny_acl], n_input=1)
+ acl_if1 = VppAclInterface(self.testcase,
+ self.ifs[1-acl_side].sw_if_index, [],
+ n_input=0)
+ acl_if0.add_vpp_config()
+ acl_if1.add_vpp_config()
else:
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[acl_side].sw_if_index, 1,
- [deny_acl_index,
- reflect_acl_index])
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[1-acl_side].sw_if_index, 0, [])
+ acl_if0 = VppAclInterface(self.testcase,
+ self.ifs[acl_side].sw_if_index,
+ [deny_acl, reflect_acl], n_input=1)
+ acl_if1 = VppAclInterface(self.testcase,
+ self.ifs[1-acl_side].sw_if_index, [],
+ n_input=0)
+ acl_if0.add_vpp_config()
+ acl_if1.add_vpp_config()
def wildcard_rule(self, is_permit):
any_addr = ["0.0.0.0", "::"]
rule_family = self.address_family
is_ip6 = 1 if rule_family == AF_INET6 else 0
- new_rule = {
- 'is_permit': is_permit,
- 'is_ipv6': is_ip6,
- 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
- 'src_ip_prefix_len': 0,
- 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
- 'dst_ip_prefix_len': 0,
- 'srcport_or_icmptype_first': 0,
- 'srcport_or_icmptype_last': 65535,
- 'dstport_or_icmpcode_first': 0,
- 'dstport_or_icmpcode_last': 65535,
- 'proto': 0,
- }
+ new_rule = AclRule(is_permit=is_permit, proto=0,
+ src_prefix=ip_network(
+ (any_addr[is_ip6], 0)),
+ dst_prefix=ip_network(
+ (any_addr[is_ip6], 0)),
+ sport_from=0, sport_to=65535, dport_from=0,
+ dport_to=65535)
return new_rule
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
+@unittest.skipUnless(config.extended, "part of extended tests")
class ACLPluginConnTestCase(VppTestCase):
""" ACL plugin connection-oriented extended testcases """
"""Run standard test teardown and log various show commands
"""
super(ACLPluginConnTestCase, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show ip arp"))
- self.logger.info(self.vapi.cli("show ip6 neighbors"))
- self.logger.info(self.vapi.cli("show acl-plugin sessions"))
- self.logger.info(self.vapi.cli("show acl-plugin acl"))
- self.logger.info(self.vapi.cli("show acl-plugin interface"))
- self.logger.info(self.vapi.cli("show acl-plugin tables"))
- self.logger.info(self.vapi.cli("show event-logger all"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show ip neighbors"))
+ self.logger.info(self.vapi.cli("show ip6 neighbors"))
+ self.logger.info(self.vapi.cli("show acl-plugin sessions"))
+ self.logger.info(self.vapi.cli("show acl-plugin acl"))
+ self.logger.info(self.vapi.cli("show acl-plugin interface"))
+ self.logger.info(self.vapi.cli("show acl-plugin tables"))
+ self.logger.info(self.vapi.cli("show event-logger all"))
def run_basic_conn_test(self, af, acl_side):
""" Basic conn timeout test """