-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
from robot.api import logger
from resources.libraries.python.Constants import Constants
+from resources.libraries.python.InterfaceUtil import InterfaceUtil
+from resources.libraries.python.IPUtil import IPUtil
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
-from resources.libraries.python.topology import Topology
class Classify:
"""Add or delete a classify table.
:param node: VPP node to create classify table.
- :param is_add: If 1 the table is added, if 0 the table is deleted.
+ :param is_add: If True the table is added, if False table is deleted.
:param mask: ACL mask in hexstring format.
:param match_n_vectors: Number of vectors to match (Default value = ~0).
:param table_index: Index of the classify table. (Default value = ~0)
This is valid only if current_data_flag is set to 1.
(Default value = 0)
:type node: dict
- :type is_add: int
- :type mask: bytes
+ :type is_add: bool
+ :type mask: str
:type match_n_vectors: int
:type table_index: int
:type nbuckets: int
cmd = u"classify_add_del_table"
args = dict(
is_add=is_add,
+ del_chain=False,
table_index=table_index,
nbuckets=nbuckets,
memory_size=memory_size,
"""Add or delete a classify session.
:param node: VPP node to create classify session.
- :param is_add: If 1 the session is added, if 0 the session is deleted.
+ :param is_add: If True the session is added, if False the session
+ is deleted.
:param table_index: Index of the table to add/del the session.
:param match: For add, match value for session, required, needs to
include bytes in front with length of skip_n_vectors of target table
:param advance: For add, advance value for session. (Default value = 0)
:param action: 0: No action (by default) metadata is not used.
1: Classified IP packets will be looked up from the specified ipv4
- fib table (configured by metadata as VRF id).
- Only valid for L3 input ACL node
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
2: Classified IP packets will be looked up from the specified ipv6
- fib table (configured by metadata as VRF id).
- Only valid for L3 input ACL node
- 3: Classified packet will be steered to source routig policy of
- given index (in metadata).
- This is only valid for IPv6 packets redirected to a source
- routing node.
- :param metadata: Valid only if action != 0
- VRF id if action is 1 or 2. SR policy index if action is 3.
- (Default value = 0)
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 3: Classified packet will be steered to source routing policy of
+ given index (in metadata).
+ This is only valid for IPv6 packets redirected to a source
+ routing node.
+ :param metadata: Valid only if action != 0. VRF id if action is 1 or 2.
+ SR policy index if action is 3. (Default value = 0)
:type node: dict
- :type is_add: int
+ :type is_add: bool
:type table_index: int
:type match: bytes
:type opaque_index: int
:type acls: list
"""
cmd = u"acl_interface_set_acl_list"
- n_input = len(acls) if acl_type == u"input" else 0
args = dict(
sw_if_index=sw_if_index,
acls=acls,
- n_input=n_input,
+ n_input=len(acls) if acl_type == u"input" else 0,
count=len(acls)
)
"""
cmd = u"acl_add_replace"
args = dict(
- tag=tag.encode("utf-8"),
+ tag=tag,
acl_index=4294967295 if acl_idx is None else acl_idx,
count=len(rules),
r=rules
return Classify._classify_add_del_table(
node,
- is_add=1,
+ is_add=True,
mask=mask,
match_n_vectors=match_n,
skip_n_vectors=skip_n
@staticmethod
def vpp_configures_classify_session_l3(
node, acl_method, table_index, skip_n, match_n, ip_version,
- direction, address, hit_next_index=Constants.BITWISE_NON_ZERO,
- opaque_index=Constants.BITWISE_NON_ZERO):
+ direction, address, hit_next_index=None,
+ opaque_index=Constants.BITWISE_NON_ZERO, action=0, metadata=0):
"""Configuration of classify session for IP address filtering.
:param node: VPP node to setup classify session.
:param direction: Direction of traffic - src/dst.
:param address: IPv4 or IPv6 address.
:param hit_next_index: hit_next_index of new session.
- (Default value = ~0)
+ (Default value = None)
:param opaque_index: opaque_index of new session. (Default value = ~0)
+ :param action: 0: No action (by default) metadata is not used.
+ 1: Classified IP packets will be looked up from the specified ipv4
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 2: Classified IP packets will be looked up from the specified ipv6
+ fib table (configured by metadata as VRF id).
+ Only valid for L3 input ACL node
+ 3: Classified packet will be steered to source routing policy of
+ given index (in metadata).
+ This is only valid for IPv6 packets redirected to a source
+ routing node.
+ :param metadata: Valid only if action != 0. VRF id if action is 1 or 2.
+ SR policy index if action is 3. (Default value = 0)
:type node: dict
:type acl_method: str
:type table_index: int
:type address: str
:type hit_next_index: int
:type opaque_index: int
+ :type action: int
+ :type metadata: int
:raises ValueError: If the parameter 'direction' has incorrect value.
"""
match_f = dict(
ip4=Classify._build_ip_match,
ip6=Classify._build_ip6_match
)
- action = dict(
- permit=0,
- deny=1
+ acl_hit_next_index = dict(
+ permit=Constants.BITWISE_NON_ZERO,
+ deny=0
)
if ip_version in (u"ip4", u"ip6"):
Classify._classify_add_del_session(
node,
- is_add=1,
+ is_add=True,
table_index=table_index,
- hit_next_index=hit_next_index,
+ hit_next_index=hit_next_index if hit_next_index is not None
+ else acl_hit_next_index[acl_method],
opaque_index=opaque_index,
match=match,
- action=action[acl_method]
+ action=action,
+ metadata=metadata
)
@staticmethod
:type acl_type: str
:type acl_idx: list
"""
- if isinstance(interface, str):
- sw_if_index = Topology.get_interface_sw_index(node, interface)
- else:
- sw_if_index = int(interface)
-
- acls = acl_idx if isinstance(acl_idx, list) else list()
-
Classify._acl_interface_set_acl_list(
- node=node, sw_if_index=sw_if_index, acl_type=acl_type, acls=acls
+ node=node,
+ sw_if_index=int(InterfaceUtil.get_interface_index(node, interface)),
+ acl_type=acl_type,
+ acls=acl_idx if isinstance(acl_idx, list) else list()
)
@staticmethod
acl_rules = list()
for rule in rules.split(u", "):
- acl_rule = dict()
- acl_rule[u"is_permit"] = 1 if u"permit" in rule else 0
- acl_rule[u"is_ipv6"] = 1 if u"ipv6" in rule else 0
+ acl_rule = dict(
+ is_permit=2 if u"permit+reflect" in rule
+ else 1 if u"permit" in rule else 0,
+ src_prefix=0,
+ dst_prefix=0,
+ proto=0,
+ srcport_or_icmptype_first=0,
+ srcport_or_icmptype_last=65535,
+ dstport_or_icmpcode_first=0,
+ dstport_or_icmpcode_last=65535,
+ tcp_flags_mask=0,
+ tcp_flags_value=0
+ )
groups = re.search(reg_ex_src_ip, rule)
if groups:
grp = groups.group(1).split(u" ")[1].split(u"/")
- acl_rule[u"src_ip_addr"] = ip_address(grp[0]).packed
- acl_rule[u"src_ip_prefix_len"] = int(grp[1])
+ acl_rule[u"src_prefix"] = IPUtil.create_prefix_object(
+ ip_address(grp[0]), int(grp[1])
+ )
groups = re.search(reg_ex_dst_ip, rule)
if groups:
grp = groups.group(1).split(u" ")[1].split(u"/")
- acl_rule[u"dst_ip_addr"] = ip_address(grp[0]).packed
- acl_rule[u"dst_ip_prefix_len"] = int(grp[1])
+ acl_rule[u"dst_prefix"] = IPUtil.create_prefix_object(
+ ip_address(grp[0]), int(grp[1])
+ )
groups = re.search(reg_ex_sport, rule)
if groups:
port = int(groups.group(1).split(u" ")[1])
acl_rule[u"srcport_or_icmptype_first"] = port
acl_rule[u"srcport_or_icmptype_last"] = port
- else:
- acl_rule[u"srcport_or_icmptype_first"] = 0
- acl_rule[u"srcport_or_icmptype_last"] = 65535
groups = re.search(reg_ex_dport, rule)
if groups:
port = int(groups.group(1).split(u" ")[1])
acl_rule[u"dstport_or_icmpcode_first"] = port
acl_rule[u"dstport_or_icmpcode_last"] = port
- else:
- acl_rule[u"dstport_or_icmpcode_first"] = 0
- acl_rule[u"dstport_or_icmpcode_last"] = 65535
groups = re.search(reg_ex_proto, rule)
if groups:
proto = int(groups.group(1).split(' ')[1])
acl_rule[u"proto"] = proto
- else:
- acl_rule[u"proto"] = 0
acl_rules.append(acl_rule)
Classify._acl_add_replace(
- node, acl_idx=acl_idx, rules=acl_rules, tag=tag)
+ node, acl_idx=acl_idx, rules=acl_rules, tag=tag
+ )
@staticmethod
def add_macip_acl_multi_entries(node, rules=u""):
acl_rules = list()
for rule in rules.split(u", "):
- acl_rule = dict()
- acl_rule[u"is_permit"] = 1 if u"permit" in rule else 0
- acl_rule[u"is_ipv6"] = 1 if u"ipv6" in rule else 0
+ acl_rule = dict(
+ is_permit=2 if u"permit+reflect" in rule
+ else 1 if u"permit" in rule else 0,
+ src_mac=6*b'0',
+ src_mac_mask=6*b'0',
+ prefix=0
+ )
groups = re.search(reg_ex_mac, rule)
if groups:
groups = re.search(reg_ex_ip, rule)
if groups:
grp = groups.group(1).split(u" ")[1].split(u"/")
- acl_rule[u"src_ip_addr"] = ip_address((grp[0])).packed
- acl_rule[u"src_ip_prefix_len"] = int(grp[1])
+ acl_rule[u"src_prefix"] = IPUtil.create_prefix_object(
+ ip_address((grp[0])), int(grp[1])
+ )
acl_rules.append(acl_rule)
:type acl_idx: str or int
:raises RuntimeError: If unable to set MACIP ACL for the interface.
"""
- if isinstance(interface, str):
- sw_if_index = Topology.get_interface_sw_index(node, interface)
- else:
- sw_if_index = interface
-
- is_add = 1 if action == u"add" else 0
-
cmd = u"macip_acl_interface_add_del"
err_msg = f"Failed to get 'macip_acl_interface' on host {node[u'host']}"
args = dict(
- is_add=is_add,
- sw_if_index=int(sw_if_index),
+ is_add=bool(action == u"add"),
+ sw_if_index=int(InterfaceUtil.get_interface_index(node, interface)),
acl_index=int(acl_idx)
)
with PapiSocketExecutor(node) as papi_exec: